read_until

Reads messages from a child input until a consumed message passes a condition, at which point the input closes.

# Config fields, showing default values
input:
read_until:
input: {}
condition:
text:
arg: ""
operator: equals_cs
part: 0
type: text
restart_input: false

Messages are read continuously while the condition returns false, when the condition returns true the message that triggered the condition is sent out and the input is closed. Use this type to define inputs where the stream should end once a certain message appears.

Sometimes inputs close themselves. For example, when the file input type reaches the end of a file it will shut down. By default this type will also shut down. If you wish for the input type to be restarted every time it shuts down until the condition is met then set restart_input to true.

Metadata

A metadata key benthos_read_until containing the value final is added to the first part of the message that triggers the input to stop.

Fields

input

The child input to consume from.

Type: object
Default: {}

condition

The condition to test messages against.

Type: object
Default: {"text":{"arg":"","operator":"equals_cs","part":0},"type":"text"}

restart_input

Whether the input should be reopened if it closes itself before the condition has resolved to true.

Type: bool
Default: false

Examples

This input is useful when paired with the count condition, as it can be used to cut the input stream off once a certain number of messages have been read:

# Only read 100 messages, and then exit.
input:
read_until:
input:
kafka_balanced:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup
condition:
not:
count:
arg: 100