sql

Runs an SQL prepared query against a target database for each message and, for queries that return rows, replaces it with the result according to a codec.

# Config fields, showing default values
sql:
driver: mysql
data_source_name: ""
query: ""
args: []
result_codec: none

If a query contains arguments they can be set as an array of strings supporting interpolation functions in the args field.

Drivers

The following is a list of supported drivers and their respective DSN formats:

DriverData Source Name Format
clickhousetcp://[netloc][:port][?param1=value1&...&paramN=valueN]
mysql[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
postgrespostgresql://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]

Please note that the postgres driver enforces SSL by default, you can override this with the parameter sslmode=disable if required.

Examples

The following example inserts rows into the table footable with the columns foo, bar and baz populated with values extracted from messages:

pipeline:
processors:
- sql:
driver: mysql
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
query: "INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);"
args:
- ${! json("document.foo") }
- ${! json("document.bar") }
- ${! meta("kafka_topic") }

Fields

driver

A database driver to use.

Type: string
Default: "mysql"
Options: mysql, postgres, clickhouse.

data_source_name

A Data Source Name to identify the target database.

Type: string
Default: ""

# Examples
data_source_name: tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
data_source_name: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

query

The query to run against the database.

Type: string
Default: ""

# Examples
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);

args

A list of arguments for the query to be resolved for each message. This field supports interpolation functions.

Type: array
Default: []

result_codec

A codec to determine how resulting rows are converted into messages.

Type: string
Default: "none"
Options: none, json_array.

Result Codecs

When a query returns rows they are serialised according to a chosen codec, and the message contents are replaced with the serialised result.

none

The result of the query is ignored and the message remains unchanged. If your query does not return rows then this is the appropriate codec.

json_array

The resulting rows are serialised into an array of JSON objects, where each object represents a row, where the key is the column name and the value is that columns value in the row.