Skip to main content

sql_insert

EXPERIMENTAL

This component is experimental and therefore subject to change or removal outside of major version releases.

Inserts rows into an SQL database for each message, and leaves the message unchanged.

Introduced in version 3.59.0.

# Common config fields, showing default values
label: ""
sql_insert:
driver: ""
dsn: ""
table: ""
columns: []
args_mapping: ""

If the insert fails to execute then the message will still remain unchanged and the error can be caught using error handling methods outlined here.

Examples#

Here we insert rows into a database by populating the columns id, name and topic with values extracted from messages and metadata:

pipeline:
processors:
- sql_insert:
driver: mysql
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
table: footable
columns: [ id, name, topic ]
args_mapping: |
root = [
this.user.id,
this.user.name,
meta("kafka_topic"),
]

Fields#

driver#

A database driver to use.

Type: string
Options: mysql, postgres, clickhouse, mssql.

dsn#

A Data Source Name to identify the target database.

Drivers#

The following is a list of supported drivers and their respective DSN formats:

DriverData Source Name Format
clickhousetcp://[netloc][:port][?param1=value1&...&paramN=valueN]
mysql[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
postgrespostgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]
mssqlsqlserver://[user[:password]@][netloc][:port][?database=dbname&param1=value1&...]

Please note that the postgres driver enforces SSL by default, you can override this with the parameter sslmode=disable if required.

Type: string

# Examples
dsn: tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

table#

The table to insert to.

Type: string

# Examples
table: foo

columns#

A list of columns to insert.

Type: array

# Examples
columns:
- foo
- bar
- baz

args_mapping#

A Bloblang mapping which should evaluate to an array of values matching in size to the number of columns specified.

Type: string

# Examples
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
args_mapping: root = [ meta("user.id") ]

prefix#

An optional prefix to prepend to the insert query (before INSERT).

Type: string

suffix#

An optional suffix to append to the insert query.

Type: string