Skip to main content

sql

BETA

This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.

Runs an SQL prepared query against a target database for each message.

Introduced in version 3.33.0.

# Common config fields, showing default values
output:
label: ""
sql:
driver: mysql
data_source_name: ""
query: ""
args_mapping: ""
max_in_flight: 1
batching:
count: 0
byte_size: 0
period: ""
check: ""

Query arguments are set using interpolation functions in the args field.

Drivers#

The following is a list of supported drivers and their respective DSN formats:

DriverData Source Name Format
clickhousetcp://[netloc][:port][?param1=value1&...&paramN=valueN]
mysql[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
postgrespostgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]
mssqlsqlserver://[user[:password]@][netloc][:port][?database=dbname&param1=value1&...]

Please note that the postgres driver enforces SSL by default, you can override this with the parameter sslmode=disable if required.

Performance#

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.

Examples#

The following example inserts rows into the table footable with the columns foo, bar and baz populated with values extracted from messages:

output:
sql:
driver: mysql
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
query: "INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);"
args_mapping: '[ this.document.foo, this.document.bar, meta("kafka_topic") ]'
batching:
count: 500

Fields#

driver#

A database driver to use.

Type: string
Default: "mysql"
Options: mysql, postgres, clickhouse, mssql.

data_source_name#

A Data Source Name to identify the target database.

Type: string
Default: ""

# Examples
data_source_name: tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
data_source_name: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

query#

The query to run against the database.

Type: string
Default: ""

# Examples
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);

args_mapping#

A Bloblang mapping that produces the arguments for the query. The mapping must return an array containing the number of arguments in the query.

Type: string
Default: ""
Requires version 3.47.0 or newer

# Examples
args_mapping: '[ this.foo, this.bar.not_empty().catch(null), meta("baz") ]'
args_mapping: root = [ uuid_v4() ].merge(this.document.args)

max_in_flight#

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: int
Default: 1

batching#

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count#

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int
Default: 0

batching.byte_size#

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int
Default: 0

batching.period#

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check#

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors#

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array
Default: []

# Examples
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
processors:
- merge_json: {}