sql

Runs an SQL query against a target database for each message batch and, for queries that return rows, replaces the batch with the result according to a codec.

sql:
driver: mysql
dsn: ""
query: ""
args: []
result_codec: none

If a query contains arguments they can be set as an array of strings supporting interpolation functions in the args field.

In order to execute an SQL query for each message of the batch use this processor within a for_each processor:

for_each:
- sql:
driver: mysql
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
query: "INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);"
args:
- ${!json_field:document.foo}
- ${!json_field:document.bar}
- ${!metadata:kafka_topic}

Drivers

The following is a list of supported drivers and their respective DSN formats:

  • mysql: [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
  • postgres: postgresql://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]

Please note that the postgres driver enforces SSL by default, you can override this with the parameter sslmode=disable if required.

Fields

driver

string A database driver to use.

Options are: mysql, postgres.

dsn

string A Data Source Name to identify the target database.

# Examples
dsn: foouser:foopassword@tcp(localhost:3306)/foodb

query

string The query to run against the database.

# Examples
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);

args

array A list of arguments for the query to be resolved for each message batch.

This field supports interpolation functions that are resolved batch wide.

result_codec

string A codec to determine how resulting rows are converted into messages.

Result Codecs

When a query returns rows they are serialised according to a chosen codec, and the batch contents are replaced with the serialised result.

none

The result of the query is ignored and the message batch remains unchanged. If your query does not return rows then this is the appropriate codec.

json_array

The resulting rows are serialised into an array of JSON objects, where each object represents a row, where the key is the column name and the value is that columns value in the row.