redis_streams

Pulls messages from Redis (v5.0+) streams with the XREADGROUP command. The client_id should be unique for each consumer of a group.

# Common config fields, showing default values
input:
redis_streams:
url: tcp://localhost:6379
body_key: body
streams:
- benthos_stream
limit: 10
client_id: benthos_consumer
consumer_group: benthos_group

Redis stream entries are key/value pairs, as such it is necessary to specify the key that contains the body of the message. All other keys/value pairs are saved as metadata fields.

Fields

url

The URL of a Redis server to connect to.

Type: string
Default: "tcp://localhost:6379"

body_key

The field key to extract the raw message from. All other keys will be stored in the message as metadata.

Type: string
Default: "body"

streams

A list of streams to consume from.

Type: array
Default: ["benthos_stream"]

limit

The maximum number of messages to consume from a single request.

Type: number
Default: 10

client_id

An identifier for the client connection.

Type: string
Default: "benthos_consumer"

consumer_group

An identifier for the consumer group of the stream.

Type: string
Default: "benthos_group"

start_from_oldest

If an offset is not found for a stream, determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset.

Type: bool
Default: true

commit_period

The period of time between each commit of the current offset. Offsets are always committed during shutdown.

Type: string
Default: "1s"

timeout

The length of time to poll for new messages before reattempting.

Type: string
Default: "1s"