sql_raw
Executes an arbitrary SQL query for each message.
Introduced in version 3.65.0.
- Common
- Advanced
# Common config fields, showing default valuesoutput:label: ""sql_raw:driver: ""dsn: ""query: ""args_mapping: ""max_in_flight: 64batching:count: 0byte_size: 0period: ""check: ""
# All config fields, showing default valuesoutput:label: ""sql_raw:driver: ""dsn: ""query: ""args_mapping: ""max_in_flight: 64conn_max_idle_time: ""conn_max_life_time: ""conn_max_idle: 0conn_max_open: 0batching:count: 0byte_size: 0period: ""check: ""processors: []
Examples​
- Table Insert (MySQL)
Here we insert rows into a database by populating the columns id, name and topic with values extracted from messages and metadata:
output:sql_raw:driver: mysqldsn: foouser:foopassword@tcp(localhost:3306)/foodbquery: "INSERT INTO footable (id, name, topic) VALUES (?, ?, ?);"args_mapping: |root = [this.user.id,this.user.name,meta("kafka_topic"),]
Fields​
driver
​
A database driver to use.
Type: string
Options: mysql
, postgres
, clickhouse
, mssql
.
dsn
​
A Data Source Name to identify the target database.
Drivers​
The following is a list of supported drivers, their placeholder style, and their respective DSN formats:
Driver | Data Source Name Format |
---|---|
clickhouse | clickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&...¶mN=valueN] |
mysql | [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] |
postgres | postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...] |
mssql | sqlserver://[user[:password]@][netloc][:port][?database=dbname¶m1=value1&...] |
Please note that the postgres
driver enforces SSL by default, you can override this with the parameter sslmode=disable
if required.
Type: string
# Examplesdsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60dsn: foouser:foopassword@tcp(localhost:3306)/foodbdsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable
query
​
The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (?
) whereas others expect incrementing dollar signs ($1
, $2
, and so on). The style to use is outlined in this table:
Driver | Placeholder Style |
---|---|
clickhouse | Dollar sign |
mysql | Question mark |
postgres | Dollar sign |
mssql | Question mark |
Type: string
# Examplesquery: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);
args_mapping
​
An optional Bloblang mapping which should evaluate to an array of values matching in size to the number of placeholder arguments in the field query
.
Type: string
# Examplesargs_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]args_mapping: root = [ meta("user.id") ]
max_in_flight
​
The maximum number of inserts to run in parallel.
Type: int
Default: 64
conn_max_idle_time
​
An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If value <= 0, connections are not closed due to a connection's idle time.
Type: string
conn_max_life_time
​
An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If value <= 0, connections are not closed due to a connection's age.
Type: string
conn_max_idle
​
An optional maximum number of connections in the idle connection pool. If conn_max_open is greater than 0 but less than the new conn_max_idle, then the new conn_max_idle will be reduced to match the conn_max_open limit. If value <= 0, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release.
Type: int
conn_max_open
​
An optional maximum number of open connections to the database. If conn_max_idle is greater than 0 and the new conn_max_open is less than conn_max_idle, then conn_max_idle will be reduced to match the new conn_max_open limit. If value <= 0, then there is no limit on the number of open connections. The default is 0 (unlimited).
Type: int
batching
​
Allows you to configure a batching policy.
Type: object
# Examplesbatching:byte_size: 5000count: 0period: 1sbatching:count: 10period: 1sbatching:check: this.contains("END BATCH")count: 0period: 1m
batching.count
​
A number of messages at which the batch should be flushed. If 0
disables count based batching.
Type: int
Default: 0
batching.byte_size
​
An amount of bytes at which the batch should be flushed. If 0
disables size based batching.
Type: int
Default: 0
batching.period
​
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examplesperiod: 1speriod: 1mperiod: 500ms
batching.check
​
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examplescheck: this.type == "end_of_transaction"
batching.processors
​
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
# Examplesprocessors:- archive:format: concatenateprocessors:- archive:format: linesprocessors:- archive:format: json_array