sql

BETA: This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.

Runs an SQL prepared query against a target database for each message.

Introduced in version 3.33.0.

# Common config fields, showing default values
output:
sql:
driver: mysql
data_source_name: ""
query: ""
args: []
max_in_flight: 1
batching:
count: 0
byte_size: 0
period: ""
check: ""

Query arguments are set using interpolation functions in the args field.

Drivers

The following is a list of supported drivers and their respective DSN formats:

DriverData Source Name Format
clickhousetcp://[netloc][:port][?param1=value1&...&paramN=valueN]
mysql[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
postgrespostgresql://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]

Please note that the postgres driver enforces SSL by default, you can override this with the parameter sslmode=disable if required.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.

Examples

The following example inserts rows into the table footable with the columns foo, bar and baz populated with values extracted from messages:

output:
sql:
driver: mysql
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
query: "INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);"
args:
- ${! json("document.foo") }
- ${! json("document.bar") }
- ${! meta("kafka_topic") }
batching:
count: 500

Fields

driver

A database driver to use.

Type: string
Default: "mysql"
Options: mysql, postgres, clickhouse.

data_source_name

A Data Source Name to identify the target database.

Type: string
Default: ""

# Examples
data_source_name: tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
data_source_name: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

query

The query to run against the database.

Type: string
Default: ""

# Examples
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);

args

A list of arguments for the query to be resolved for each message. This field supports interpolation functions.

Type: array
Default: []

max_in_flight

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: number
Default: 1

batching

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: number
Default: 0

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: number
Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array
Default: []

# Examples
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
processors:
- merge_json: {}