Benthos
Fancy stream processing made operationally mundane
It's boringly easy to use
Written in Go, deployed as a static binary, declarative configuration. Open source and cloud native as utter heck.
- Curl
- Homebrew
- Docker
- Asdf
# Install
curl -Lsf https://sh.benthos.dev | bash
# Make a config
benthos create nats/protobuf/aws_sqs > ./config.yaml
# Run
benthos -c ./config.yaml
# Install
brew install benthos
# Make a config
benthos create nats/protobuf/aws_sqs > ./config.yaml
# Run
benthos -c ./config.yaml
# Pull
docker pull jeffail/benthos
# Make a config
docker run --rm jeffail/benthos create nats/protobuf/aws_sqs > ./config.yaml
# Run
docker run --rm -v $(pwd)/config.yaml:/benthos.yaml jeffail/benthos
# Install
asdf plugin add benthos
asdf install benthos latest
asdf global benthos latest
# Make a config
benthos create nats/protobuf/aws_sqs > ./config.yaml
# Run
benthos -c ./config.yaml
- Mapping
- Multiplexing
- Windowing
- Enrichments
input:
gcp_pubsub:
project: foo
subscription: bar
pipeline:
processors:
- mapping: |
root.message = this
root.meta.link_count = this.links.length()
root.user.age = this.user.age.number()
output:
redis_streams:
url: tcp://TODO:6379
stream: baz
max_in_flight: 20
input:
kafka:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup
output:
switch:
cases:
- check: doc.tags.contains("AWS")
output:
aws_sqs:
url: https://sqs.us-west-2.amazonaws.com/TODO/TODO
max_in_flight: 20
- output:
redis_pubsub:
url: tcp://TODO:6379
channel: baz
max_in_flight: 20
input:
nats_jetstream:
urls: [ nats://TODO:4222 ]
queue: myqueue
subject: traffic.light.events
deliver: all
buffer:
system_window:
timestamp_mapping: root = this.created_at
size: 1h
pipeline:
processors:
- group_by_value:
value: '${! json("traffic_light_id") }'
- mapping: |
root = if batch_index() == 0 {
{
"traffic_light_id": this.traffic_light_id,
"created_at": @window_end_timestamp,
"total_cars": json("registration_plate").from_all().unique().length(),
"passengers": json("passengers").from_all().sum(),
}
} else { deleted() }
output:
http_client:
url: https://example.com/traffic_data
verb: POST
max_in_flight: 64
input:
mqtt:
urls: [ tcp://TODO:1883 ]
topics: [ foo ]
pipeline:
processors:
- branch:
request_map: |
root.id = this.doc.id
root.content = this.doc.body
processors:
- aws_lambda:
function: sentiment_analysis
result_map: root.results.sentiment = this
output:
aws_s3:
bucket: TODO
path: '${! meta("partition") }/${! timestamp_unix_nano() }.tar.gz'
batching:
count: 100
period: 10s
processors:
- archive:
format: tar
- compress:
algorithm: gzip
Takes Care of the Dull Stuff
Benthos solves common data engineering tasks such as transformations, integrations, and multiplexing with declarative and unit testable configuration. This allows you to easily and incrementally adapt your data pipelines as requirements change, letting you focus on the more exciting stuff.
It comes armed with a wide range of processors, a lit mapping language, stateless windowed processing capabilities and an industry leading mascot.
Well Connected
Benthos is able to glue a wide range of sources and sinks together and hook into a variety of databases, caches, HTTP APIs, lambdas and more, enabling you to seamlessly drop it into your existing infrastructure.
Working with disparate APIs and services can be a daunting task, doubly so in a streaming data context. With Benthos it's possible to break these tasks down and automatically parallelize them as a streaming workflow.
Create, edit and test configs visually
Declarative YAML is great for seamlessly integrating with version control tools, but creating and editing large configs can become a right bother.
Benthos Studio is a visual web application that allows you to create/import configs, edit and test them, and share them with others. It's so fun you'll be making configs just for the heck of it! Loser.
Reliable and Operationally Simple
Delivery guarantees can be a dodgy subject. Benthos processes and acknowledges messages using an in-process transaction model with no need for any disk persisted state, so when connecting to at-least-once sources and sinks it's able to guarantee at-least-once delivery even in the event of crashes, disk corruption, or other unexpected server faults.
This behaviour is the default and free of caveats, which also makes deploying and scaling Benthos much simpler. However, simplicity doesn't negate the need for observability, so it also exposes metrics and tracing events to targets of your choice.
Extendable
Sometimes the components that come with Benthos aren't enough. Luckily, Benthos has been designed to be easily plugged with whatever components you need.
You can either write plugins directly in Go (recommended) or you can have Benthos run your plugin as a subprocess.