sql_raw
Runs an arbitrary SQL query against a database and (optionally) returns the result as an array of objects, one for each row returned.
Introduced in version 3.65.0.
- Common
- Advanced
# Common config fields, showing default valueslabel: ""sql_raw:driver: ""dsn: ""query: ""args_mapping: ""exec_only: false
# All config fields, showing default valueslabel: ""sql_raw:driver: ""dsn: ""query: ""unsafe_dynamic_query: falseargs_mapping: ""exec_only: falseconn_max_idle_time: ""conn_max_life_time: ""conn_max_idle: 0conn_max_open: 0
If the query fails to execute then the message will remain unchanged and the error can be caught using error handling methods outlined here.
Examples​
- Table Insert (MySQL)
- Table Query (PostgreSQL)
The following example inserts rows into the table footable with the columns foo, bar and baz populated with values extracted from messages.
pipeline:processors:- sql_raw:driver: mysqldsn: foouser:foopassword@tcp(localhost:3306)/foodbquery: "INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);"args_mapping: '[ document.foo, document.bar, meta("kafka_topic") ]'exec_only: true
Here we query a database for columns of footable that share a user_id
with the message field user.id
. A branch
processor is used in order to insert the resulting array into the original message at the path foo_rows
.
pipeline:processors:- branch:processors:- sql_raw:driver: postgresdsn: postgres://foouser:foopass@localhost:5432/testdb?sslmode=disablequery: "SELECT * FROM footable WHERE user_id = $1;"args_mapping: '[ this.user.id ]'result_map: 'root.foo_rows = this'
Fields​
driver
​
A database driver to use.
Type: string
Options: mysql
, postgres
, clickhouse
, mssql
.
dsn
​
A Data Source Name to identify the target database.
Drivers​
The following is a list of supported drivers, their placeholder style, and their respective DSN formats:
Driver | Data Source Name Format |
---|---|
clickhouse | clickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&...¶mN=valueN] |
mysql | [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] |
postgres | postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...] |
mssql | sqlserver://[user[:password]@][netloc][:port][?database=dbname¶m1=value1&...] |
Please note that the postgres
driver enforces SSL by default, you can override this with the parameter sslmode=disable
if required.
Type: string
# Examplesdsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60dsn: foouser:foopassword@tcp(localhost:3306)/foodbdsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable
query
​
The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (?
) whereas others expect incrementing dollar signs ($1
, $2
, and so on). The style to use is outlined in this table:
Driver | Placeholder Style |
---|---|
clickhouse | Dollar sign |
mysql | Question mark |
postgres | Dollar sign |
mssql | Question mark |
Type: string
# Examplesquery: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);query: SELECT * FROM footable WHERE user_id = $1;
unsafe_dynamic_query
​
Whether to enable interpolation functions in the query. Great care should be made to ensure your queries are defended against injection attacks.
Type: bool
Default: false
args_mapping
​
An optional Bloblang mapping which should evaluate to an array of values matching in size to the number of placeholder arguments in the field query
.
Type: string
# Examplesargs_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]args_mapping: root = [ meta("user.id") ]
exec_only
​
Whether the query result should be discarded. When set to true
the message contents will remain unchanged, which is useful in cases where you are executing inserts, updates, etc.
Type: bool
Default: false
conn_max_idle_time
​
An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If value <= 0, connections are not closed due to a connection's idle time.
Type: string
conn_max_life_time
​
An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If value <= 0, connections are not closed due to a connection's age.
Type: string
conn_max_idle
​
An optional maximum number of connections in the idle connection pool. If conn_max_open is greater than 0 but less than the new conn_max_idle, then the new conn_max_idle will be reduced to match the conn_max_open limit. If value <= 0, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release.
Type: int
conn_max_open
​
An optional maximum number of open connections to the database. If conn_max_idle is greater than 0 and the new conn_max_open is less than conn_max_idle, then conn_max_idle will be reduced to match the new conn_max_open limit. If value <= 0, then there is no limit on the number of open connections. The default is 0 (unlimited).
Type: int