kinesis_balanced

BETA: This input is a beta component and is subject to change outside of major version releases.

Receives messages from a Kinesis stream and automatically balances shards across consumers.

# Common config fields, showing default values
input:
kinesis_balanced:
stream: ""
dynamodb_table: ""
start_from_oldest: true
region: eu-west-1
batching:
count: 1
byte_size: 0
period: ""

Messages consumed by this input can be processed in parallel, meaning a single instance of this input can utilise any number of threads within a pipeline section of a config.

Use the batching fields to configure an optional batching policy.

Metadata

This input adds the following metadata fields to each message:

- kinesis_shard
- kinesis_partition_key
- kinesis_sequence_number

You can access these metadata fields using function interpolation.

Fields

stream

The Kinesis stream to consume from.

Type: string
Default: ""

dynamodb_table

A DynamoDB table to use for offset storage.

Type: string
Default: ""

dynamodb_billing_mode

A billing mode to set for the offset DynamoDB table.

Type: string
Default: ""

dynamodb_read_provision

The read capacity of the offset DynamoDB table.

Type: number
Default: 0

dynamodb_write_provision

The write capacity of the offset DynamoDB table.

Type: number
Default: 0

start_from_oldest

Whether to consume from the oldest message when an offset does not yet exist for the stream.

Type: bool
Default: true

region

The AWS region to target.

Type: string
Default: "eu-west-1"

endpoint

Allows you to specify a custom endpoint for the AWS API.

Type: string
Default: ""

credentials

Optional manual configuration of AWS credentials to use. More information can be found in this document.

Type: object
Default: {"id":"","profile":"","role":"","role_external_id":"","secret":"","token":""}

credentials.profile

A profile from ~/.aws/credentials to use.

Type: string
Default: ""

credentials.id

The ID of credentials to use.

Type: string
Default: ""

credentials.secret

The secret for the credentials being used.

Type: string
Default: ""

credentials.token

The token for the credentials being used, required when using short term credentials.

Type: string
Default: ""

credentials.role

A role ARN to assume.

Type: string
Default: ""

credentials.role_external_id

An external ID to provide when assuming a role.

Type: string
Default: ""

batching

Allows you to configure a batching policy.

Type: object
Default: {"byte_size":0,"condition":{"static":false,"type":"static"},"count":1,"period":"","processors":[]}

# Examples
batching:
byte_size: 5000
period: 1s
batching:
count: 10
period: 1s
batching:
condition:
text:
arg: END BATCH
operator: contains
period: 1m

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: number
Default: 1

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: number
Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.condition

A condition to test against each message entering the batch, if this condition resolves to true then the batch is flushed.

Type: object
Default: {"static":false,"type":"static"}

batching.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array
Default: []

# Examples
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
processors:
- merge_json: {}