Skip to main content

sql

Runs an SQL prepared query against a target database for each message and, for queries that return rows, replaces it with the result according to a codec.

# Common config fields, showing default values
label: ""
sql:
driver: mysql
data_source_name: ""
query: ""
args_mapping: ""
result_codec: none

If a query contains arguments they can be set as an array of strings supporting interpolation functions in the args field.

Drivers#

The following is a list of supported drivers and their respective DSN formats:

DriverData Source Name Format
clickhousetcp://[netloc][:port][?param1=value1&...&paramN=valueN]
mysql[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
postgrespostgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]
mssqlsqlserver://[user[:password]@][netloc][:port][?database=dbname&param1=value1&...]

Please note that the postgres driver enforces SSL by default, you can override this with the parameter sslmode=disable if required.

Examples#

The following example inserts rows into the table footable with the columns foo, bar and baz populated with values extracted from messages:

pipeline:
processors:
- sql:
driver: mysql
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
query: "INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);"
args_mapping: '[ document.foo, document.bar, meta("kafka_topic") ]'

Fields#

driver#

A database driver to use.

Type: string
Default: "mysql"
Options: mysql, postgres, clickhouse, mssql.

data_source_name#

A Data Source Name to identify the target database.

Type: string
Default: ""

# Examples
data_source_name: tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
data_source_name: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

query#

The query to run against the database.

Type: string
Default: ""

# Examples
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);

unsafe_dynamic_query#

Whether to enable dynamic queries that support interpolation functions. WARNING: This feature opens up the possibility of SQL injection attacks and is considered unsafe.

Type: bool
Default: false

args_mapping#

A Bloblang mapping that produces the arguments for the query. The mapping must return an array containing the number of arguments in the query.

Type: string
Default: ""
Requires version 3.47.0 or newer

# Examples
args_mapping: '[ this.foo, this.bar.not_empty().catch(null), meta("baz") ]'
args_mapping: root = [ uuid_v4() ].merge(this.document.args)

result_codec#

A codec to determine how resulting rows are converted into messages.

Type: string
Default: "none"
Options: none, json_array.

Result Codecs#

When a query returns rows they are serialised according to a chosen codec, and the message contents are replaced with the serialised result.

none#

The result of the query is ignored and the message remains unchanged. If your query does not return rows then this is the appropriate codec.

json_array#

The resulting rows are serialised into an array of JSON objects, where each object represents a row, where the key is the column name and the value is that columns value in the row.