An input is a source of data piped through an array of optional processors:
input:label: my_redis_inputredis_streams:url: tcp://localhost:6379streams:- benthos_streambody_key: bodyconsumer_group: benthos_group# Optional list of processing stepsprocessors:- bloblang: |root.document = this.without("links")root.link_count = this.links.length()
Some inputs have a logical end, for example a
csv input ends once the last row is consumed, when this happens the input gracefully terminates and Benthos will shut itself down once all messages have been processed fully.
It's also possible to specify a logical end for an input that otherwise doesn't have one with the
read_until input, which checks a condition against each consumed message in order to determine whether it should be the last.
Only one input is configured at the root of a Benthos config. However, the root input can be a broker which combines multiple inputs and merges the streams:
input:broker:inputs:- kafka:addresses: [ TODO ]topics: [ foo, bar ]consumer_group: foogroup- redis_streams:url: tcp://localhost:6379streams:- benthos_streambody_key: bodyconsumer_group: benthos_group
Inputs have an optional field
label that can uniquely identify them in observability data such as metrics and logs. This can be useful when running configs with multiple inputs, otherwise their metrics labels will be generated based on their composition. For more information check out the metrics documentation.
Sometimes it's useful to consume a sequence of inputs, where an input is only consumed once its predecessor is drained fully, you can achieve this with the
It's possible to generate data with Benthos using the
generate input, which is also a convenient way to trigger scheduled pipelines.
Inputs that consume from storage or message streaming services.amqp_0_9amqp_1aws_kinesisaws_s3aws_sqsazure_blob_storageazure_queue_storagediscordgcp_cloud_storagegcp_pubsubhdfskafkamqttnatsnats_jetstreamnats_streamnsqpulsarredis_listredis_pubsubredis_streamstwitter_search
Inputs that consume directly from low level network protocols.http_clienthttp_servernanomsgsftpsocketsocket_serverwebsocketzmq4