Runs an SQL query against a target database for each message batch and, for queries that return rows, replaces the batch with the result according to a codec.

# Config fields, showing default values
driver: mysql
dsn: ""
query: ""
args: []
result_codec: none

If a query contains arguments they can be set as an array of strings supporting interpolation functions in the args field.

In order to execute an SQL query for each message of the batch use this processor within a for_each processor:

- sql:
driver: mysql
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
query: "INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);"
- ${! json("document.foo") }
- ${! json("document.bar") }
- ${! meta("kafka_topic") }


The following is a list of supported drivers and their respective DSN formats:

  • mysql: [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
  • postgres: postgresql://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]

Please note that the postgres driver enforces SSL by default, you can override this with the parameter sslmode=disable if required.



A database driver to use.

Type: string
Default: "mysql"
Options: mysql, postgres.


A Data Source Name to identify the target database.

Type: string
Default: ""

# Examples
dsn: foouser:foopassword@tcp(localhost:3306)/foodb


The query to run against the database.

Type: string
Default: ""

# Examples
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);


A list of arguments for the query to be resolved for each message batch. This field supports interpolation functions.

Type: array
Default: []


A codec to determine how resulting rows are converted into messages.

Type: string
Default: "none"

Result Codecs

When a query returns rows they are serialised according to a chosen codec, and the batch contents are replaced with the serialised result.


The result of the query is ignored and the message batch remains unchanged. If your query does not return rows then this is the appropriate codec.


The resulting rows are serialised into an array of JSON objects, where each object represents a row, where the key is the column name and the value is that columns value in the row.