Skip to main content

sql

Runs an SQL prepared query against a target database for each message and, for queries that return rows, replaces it with the result according to a codec.

# Config fields, showing default values
label: ""
sql:
driver: mysql
data_source_name: ""
query: ""
args: []
result_codec: none

If a query contains arguments they can be set as an array of strings supporting interpolation functions in the args field.

Drivers#

The following is a list of supported drivers and their respective DSN formats:

DriverData Source Name Format
clickhousetcp://[netloc][:port][?param1=value1&...&paramN=valueN]
mysql[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
postgrespostgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]

Please note that the postgres driver enforces SSL by default, you can override this with the parameter sslmode=disable if required.

Examples#

The following example inserts rows into the table footable with the columns foo, bar and baz populated with values extracted from messages:

pipeline:
processors:
- sql:
driver: mysql
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
query: "INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);"
args:
- ${! json("document.foo") }
- ${! json("document.bar") }
- ${! meta("kafka_topic") }

Fields#

driver#

A database driver to use.

Type: string
Default: "mysql"
Options: mysql, postgres, clickhouse.

data_source_name#

A Data Source Name to identify the target database.

Type: string
Default: ""

# Examples
data_source_name: tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
data_source_name: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

query#

The query to run against the database.

Type: string
Default: ""

# Examples
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);

args#

A list of arguments for the query to be resolved for each message. This field supports interpolation functions.

Type: array
Default: []

result_codec#

A codec to determine how resulting rows are converted into messages.

Type: string
Default: "none"
Options: none, json_array.

Result Codecs#

When a query returns rows they are serialised according to a chosen codec, and the message contents are replaced with the serialised result.

none#

The result of the query is ignored and the message remains unchanged. If your query does not return rows then this is the appropriate codec.

json_array#

The resulting rows are serialised into an array of JSON objects, where each object represents a row, where the key is the column name and the value is that columns value in the row.