Skip to main content

sql_raw

Executes an arbitrary SQL query for each message.

Introduced in version 3.65.0.

# Common config fields, showing default values
output:
label: ""
sql_raw:
driver: ""
dsn: ""
query: ""
args_mapping: ""
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
check: ""

Examples

Here we insert rows into a database by populating the columns id, name and topic with values extracted from messages and metadata:

output:
sql_raw:
driver: mysql
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
query: "INSERT INTO footable (id, name, topic) VALUES (?, ?, ?);"
args_mapping: |
root = [
this.user.id,
this.user.name,
meta("kafka_topic"),
]

Fields

driver

A database driver to use.

Type: string
Options: mysql, postgres, clickhouse, mssql.

dsn

A Data Source Name to identify the target database.

Drivers

The following is a list of supported drivers, their placeholder style, and their respective DSN formats:

DriverData Source Name Format
clickhouseclickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&...&paramN=valueN]
mysql[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
postgrespostgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]
mssqlsqlserver://[user[:password]@][netloc][:port][?database=dbname&param1=value1&...]

Please note that the postgres driver enforces SSL by default, you can override this with the parameter sslmode=disable if required.

Type: string

# Examples
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

query

The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (?) whereas others expect incrementing dollar signs ($1, $2, and so on). The style to use is outlined in this table:

DriverPlaceholder Style
clickhouseDollar sign
mysqlQuestion mark
postgresDollar sign
mssqlQuestion mark

Type: string

# Examples
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);

args_mapping

An optional Bloblang mapping which should evaluate to an array of values matching in size to the number of placeholder arguments in the field query.

Type: string

# Examples
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
args_mapping: root = [ meta("user.id") ]

max_in_flight

The maximum number of inserts to run in parallel.

Type: int
Default: 64

conn_max_idle_time

An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If value <= 0, connections are not closed due to a connection's idle time.

Type: string

conn_max_life_time

An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If value <= 0, connections are not closed due to a connection's age.

Type: string

conn_max_idle

An optional maximum number of connections in the idle connection pool. If conn_max_open is greater than 0 but less than the new conn_max_idle, then the new conn_max_idle will be reduced to match the conn_max_open limit. If value <= 0, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release.

Type: int

conn_max_open

An optional maximum number of open connections to the database. If conn_max_idle is greater than 0 and the new conn_max_open is less than conn_max_idle, then conn_max_idle will be reduced to match the new conn_max_open limit. If value <= 0, then there is no limit on the number of open connections. The default is 0 (unlimited).

Type: int

batching

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int
Default: 0

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int
Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array

# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array