Skip to main content

kafka_franz

EXPERIMENTAL

This component is experimental and therefore subject to change or removal outside of major version releases.

An alternative Kafka input using the Franz Kafka client library.

Introduced in version 3.61.0.

# Common config fields, showing default values
input:
label: ""
kafka_franz:
seed_brokers: []
topics: []
regexp_topics: false
consumer_group: ""

Consumes one or more topics by balancing the partitions across any other connected clients with the same consumer group.

This input is new and experimental, and the existing kafka input is not going anywhere, but here's some reasons why it might be worth trying this one out:

  • You like shiny new stuff
  • You are experiencing issues with the existing kafka input
  • Someone told you to

Metadata

This input adds the following metadata fields to each message:

- kafka_key
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_timestamp_unix
- All record headers

Fields

seed_brokers

A list of broker addresses to connect to in order to establish connections. If an item of the list contains commas it will be expanded into multiple addresses.

Type: array

# Examples

seed_brokers:
- localhost:9092

seed_brokers:
- foo:9092
- bar:9092

seed_brokers:
- foo:9092,bar:9092

topics

A list of topics to consume from, partitions are automatically shared across consumers sharing the consumer group.

Type: array

regexp_topics

Whether listed topics should be interpretted as regular expression patterns for matching multiple topics.

Type: bool
Default: false

consumer_group

A consumer group to consume as. Partitions are automatically distributed across consumers sharing a consumer group, and partition offsets are automatically commited and resumed under this name.

Type: string

checkpoint_limit

Determines how many messages of the same partition can be processed in parallel before applying back pressure. When a message of a given offset is delivered to the output the offset is only allowed to be committed when all messages of prior offsets have also been delivered, this ensures at-least-once delivery guarantees. However, this mechanism also increases the likelihood of duplicates in the event of crashes or server faults, reducing the checkpoint limit will mitigate this.

Type: int
Default: 1024

commit_period

The period of time between each commit of the current partition offsets. Offsets are always committed during shutdown.

Type: string
Default: "5s"

start_from_oldest

If an offset is not found for a topic partition, determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset.

Type: bool
Default: true

tls

Custom TLS settings can be used to override system defaults.

Type: object

tls.enabled

Whether custom TLS settings are enabled.

Type: bool
Default: false

tls.skip_cert_verify

Whether to skip server side certificate verification.

Type: bool
Default: false

tls.enable_renegotiation

Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you're seeing the error message local error: tls: no renegotiation.

Type: bool
Default: false
Requires version 3.45.0 or newer

tls.root_cas

An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

Type: string
Default: ""

# Examples

root_cas: |-
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----

tls.root_cas_file

An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

Type: string
Default: ""

# Examples

root_cas_file: ./root_cas.pem

tls.client_certs

A list of client certificates to use. For each certificate either the fields cert and key, or cert_file and key_file should be specified, but not both.

Type: array

# Examples

client_certs:
- cert: foo
key: bar

client_certs:
- cert_file: ./example.pem
key_file: ./example.key

tls.client_certs[].cert

A plain text certificate to use.

Type: string
Default: ""

tls.client_certs[].key

A plain text certificate key to use.

Type: string
Default: ""

tls.client_certs[].cert_file

The path of a certificate to use.

Type: string
Default: ""

tls.client_certs[].key_file

The path of a certificate key to use.

Type: string
Default: ""

tls.client_certs[].password

A plain text password for when the private key is a password encrypted PEM block according to RFC 1423. Warning: Since it does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.

Type: string
Default: ""

# Examples

password: foo

password: ${KEY_PASSWORD}

sasl

Specify one or more methods of SASL authentication. SASL is tried in order; if the broker supports the first mechanism, all connections will use that mechanism. If the first mechanism fails, the client will pick the first supported mechanism. If the broker does not support any client mechanisms, connections will fail.

Type: array

# Examples

sasl:
- mechanism: SCRAM-SHA-512
password: bar
username: foo

sasl[].mechanism

The SASL mechanism to use.

Type: string

OptionSummary
AWS_MSK_IAMAWS IAM based authentication as specified by the 'aws-msk-iam-auth' java library.
OAUTHBEAREROAuth Bearer based authentication.
PLAINPlain text authentication.
SCRAM-SHA-256SCRAM based authentication as specified in RFC5802.
SCRAM-SHA-512SCRAM based authentication as specified in RFC5802.
noneDisable sasl authentication

sasl[].username

A username to provide for PLAIN or SCRAM-* authentication.

Type: string
Default: ""

sasl[].password

A password to provide for PLAIN or SCRAM-* authentication.

Type: string
Default: ""

sasl[].token

The token to use for a single session's OAUTHBEARER authentication.

Type: string
Default: ""

sasl[].extensions

Key/value pairs to add to OAUTHBEARER authentication requests.

Type: object

sasl[].aws

Contains AWS specific fields for when the mechanism is set to AWS_MSK_IAM.

Type: object

sasl[].aws.region

The AWS region to target.

Type: string
Default: ""

sasl[].aws.endpoint

Allows you to specify a custom endpoint for the AWS API.

Type: string
Default: ""

sasl[].aws.credentials

Optional manual configuration of AWS credentials to use. More information can be found in this document.

Type: object

sasl[].aws.credentials.profile

A profile from ~/.aws/credentials to use.

Type: string
Default: ""

sasl[].aws.credentials.id

The ID of credentials to use.

Type: string
Default: ""

sasl[].aws.credentials.secret

The secret for the credentials being used.

Type: string
Default: ""

sasl[].aws.credentials.token

The token for the credentials being used, required when using short term credentials.

Type: string
Default: ""

sasl[].aws.credentials.from_ec2_role

Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance.

Type: bool
Default: false
Requires version 4.2.0 or newer

sasl[].aws.credentials.role

A role ARN to assume.

Type: string
Default: ""

sasl[].aws.credentials.role_external_id

An external ID to provide when assuming a role.

Type: string
Default: ""