kinesis

Receive messages from a Kinesis stream.

# Common config fields, showing default values
input:
kinesis:
stream: ""
shard: "0"
client_id: benthos_consumer
commit_period: 1s
dynamodb_table: ""
start_from_oldest: true
region: eu-west-1
batching:
count: 1
byte_size: 0
period: ""

It's possible to use DynamoDB for persisting shard iterators by setting the table name. Offsets will then be tracked per client_id per shard_id. When using this mode you should create a table with namespace as the primary key and shard_id as a sort key.

Use the batching fields to configure an optional batching policy. Any other batching mechanism will stall with this input due its sequential transaction model.

Fields

stream

The Kinesis stream to consume from.

Type: string
Default: ""

shard

The shard to consume from.

Type: string
Default: "0"

client_id

The client identifier to assume.

Type: string
Default: "benthos_consumer"

commit_period

The rate at which offset commits should be sent.

Type: string
Default: "1s"

dynamodb_table

A DynamoDB table to use for offset storage.

Type: string
Default: ""

start_from_oldest

Whether to consume from the oldest message when an offset does not yet exist for the stream.

Type: bool
Default: true

region

The AWS region to target.

Type: string
Default: "eu-west-1"

endpoint

Allows you to specify a custom endpoint for the AWS API.

Type: string
Default: ""

credentials

Optional manual configuration of AWS credentials to use. More information can be found in this document.

Type: object
Default: {"id":"","profile":"","role":"","role_external_id":"","secret":"","token":""}

credentials.profile

A profile from ~/.aws/credentials to use.

Type: string
Default: ""

credentials.id

The ID of credentials to use.

Type: string
Default: ""

credentials.secret

The secret for the credentials being used.

Type: string
Default: ""

credentials.token

The token for the credentials being used, required when using short term credentials.

Type: string
Default: ""

credentials.role

A role ARN to assume.

Type: string
Default: ""

credentials.role_external_id

An external ID to provide when assuming a role.

Type: string
Default: ""

timeout

The period of time to wait before abandoning a request and trying again.

Type: string
Default: "5s"

limit

The maximum number of messages to consume from each request.

Type: number
Default: 100

batching

Allows you to configure a batching policy.

Type: object
Default: {"byte_size":0,"condition":{"static":false,"type":"static"},"count":1,"period":"","processors":[]}

# Examples
batching:
byte_size: 5000
period: 1s
batching:
count: 10
period: 1s
batching:
condition:
text:
arg: END BATCH
operator: contains
period: 1m

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: number
Default: 1

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: number
Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.condition

A condition to test against each message entering the batch, if this condition resolves to true then the batch is flushed.

Type: object
Default: {"static":false,"type":"static"}

batching.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array
Default: []

# Examples
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
processors:
- merge_json: {}