Skip to main content

cassandra

BETA

This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.

Runs a query against a Cassandra database for each message in order to insert data.

# Common config fields, showing default values
output:
label: ""
cassandra:
addresses: []
query: ""
args_mapping: ""
max_in_flight: 1
batching:
count: 0
byte_size: 0
period: ""
check: ""

Query arguments can be set using interpolation functions in the args field or by creating a bloblang array for the fields using the args_mapping field.

When populating timestamp columns the value must either be a string in ISO 8601 format (2006-01-02T15:04:05Z07:00), or an integer representing unix time in seconds.

Performance#

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.

Examples#

If we were to create a table with some basic columns with CREATE TABLE foo.bar (id int primary key, content text, created_at timestamp);, and were processing JSON documents of the form {"id":"342354354","content":"hello world","timestamp":1605219406}, we could populate our table with the following config:

output:
cassandra:
addresses:
- localhost:9042
query: 'INSERT INTO foo.bar (id, content, created_at) VALUES (?, ?, ?)'
args_mapping: |
root = [
this.id,
this.content,
this.timestamp
]
batching:
count: 500

Fields#

addresses#

A list of Cassandra nodes to connect to. Multiple comma separated addresses can be specified on a single line.

Type: array
Default: []

# Examples
addresses:
- localhost:9042
addresses:
- foo:9042
- bar:9042
addresses:
- foo:9042,bar:9042

tls#

Custom TLS settings can be used to override system defaults.

Type: object

tls.enabled#

Whether custom TLS settings are enabled.

Type: bool
Default: false

tls.skip_cert_verify#

Whether to skip server side certificate verification.

Type: bool
Default: false

tls.enable_renegotiation#

Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you're seeing the error message local error: tls: no renegotiation.

Type: bool
Default: false
Requires version 3.45.0 or newer

tls.root_cas#

An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

Type: string
Default: ""

# Examples
root_cas: |-
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----

tls.root_cas_file#

An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

Type: string
Default: ""

# Examples
root_cas_file: ./root_cas.pem

tls.client_certs#

A list of client certificates to use. For each certificate either the fields cert and key, or cert_file and key_file should be specified, but not both.

Type: array
Default: []

# Examples
client_certs:
- cert: foo
key: bar
client_certs:
- cert_file: ./example.pem
key_file: ./example.key

tls.client_certs[].cert#

A plain text certificate to use.

Type: string
Default: ""

tls.client_certs[].key#

A plain text certificate key to use.

Type: string
Default: ""

tls.client_certs[].cert_file#

The path to a certificate to use.

Type: string
Default: ""

tls.client_certs[].key_file#

The path of a certificate key to use.

Type: string
Default: ""

password_authenticator#

An object containing the username and password.

Type: object

password_authenticator.enabled#

Whether to use password authentication.

Type: bool
Default: false

password_authenticator.username#

A username.

Type: string
Default: ""

password_authenticator.password#

A password.

Type: string
Default: ""

disable_initial_host_lookup#

If enabled the driver will not attempt to get host info from the system.peers table. This can speed up queries but will mean that data_centre, rack and token information will not be available.

Type: bool
Default: false

query#

A query to execute for each message.

Type: string
Default: ""

args_mapping#

A Bloblang mapping that can be used to provide arguments to Cassandra queries. The result of the query must be an array containing a matching number of elements to the query arguments.

Type: string
Default: ""
Requires version 3.55.0 or newer

consistency#

The consistency level to use.

Type: string
Default: "QUORUM"
Options: ANY, ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE.

max_retries#

The maximum number of retries before giving up on a request.

Type: int
Default: 3

backoff#

Control time intervals between retry attempts.

Type: object

backoff.initial_interval#

The initial period to wait between retry attempts.

Type: string
Default: "1s"

backoff.max_interval#

The maximum period to wait between retry attempts.

Type: string
Default: "5s"

max_in_flight#

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: int
Default: 1

batching#

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count#

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int
Default: 0

batching.byte_size#

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int
Default: 0

batching.period#

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check#

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors#

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array
Default: []

# Examples
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
processors:
- merge_json: {}