Components

A good ninja gets clued up on its gear.

Core Components

Every Benthos pipeline has at least one input, an optional buffer, an output and any number of processors:

input:
kafka_balanced:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup
buffer:
type: none
pipeline:
processors:
- jmespath:
query: '{ message: @, meta: { link_count: length(links) } }'
output:
s3:
bucket: TODO
path: "${!metadata:kafka_topic}/${!json_field:message.id}.json"

These are the main components within Benthos and they provide the majority of useful behaviour.

Observability Components

There are also the observability components logger, metrics, and tracing, which allow you to specify how Benthos exposes observability data:

logger:
prefix: benthos
level: WARN
json_format: true
metrics:
statsd:
prefix: foo
address: localhost:8125
flush_period: 100ms
tracer:
jaeger:
agent_address: localhost:6831
service_name: benthos

Resource Components

Finally, there are conditions, caches and rate limits. These are components that are useful when used by core components, and they are either configured as a field within that component:

pipeline:
processors:
- filter_parts: # This is a processor
text: # This is a child condition
operator: contains
arg: foo

Or as a resource where they are referenced by one or more core components:

input:
http_client: # This is an input
url: TODO
rate_limit: foo_ratelimit # This is a reference to a rate limit
pipeline:
processors:
- filter_parts: # This is a processor
resource: bar_condition # This is a reference to a condition
- dedupe: # This is another processor
cache: baz_cache # This is a reference to a cache
hash: xxhash
key: ${json_field:id}
resources:
rate_limits:
foo_ratelimit:
local:
count: 500
interval: 1s
conditions:
bar_condition:
text:
operator: contains
arg: foo
caches:
baz_cache:
memcached:
addresses: [ localhost:11211 ]
ttl: 60

For more information about any of these components check out their sections: