Windowed Processing

There are many ways of performing windowed or aggregated message processing with the wide range of connectors and processors Benthos offers, but this usually relies on aggregating messages in transit with a cache or database.

Instead, this document outlines the simplest way of performing tumbling window processing in Benthos, which is to use input level batching. There are plans to eventually offer other windowing mechanisms such as hopping or sliding and these will behave similarly.

Creating Batches

Firstly, we will need to create batches of messages before our processing stages. This batch mechanism is what creates our window of messages, which we can later process as a group.

Some inputs natively support batching, otherwise we can wrap them within a broker:

input:
kafka:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup
batching:
count: 50
period: 30s

NOTE: Batching here doesn't mean we have to output messages as a batch. After processing we can break this batch out and even re-batch with different settings if we want.

When a batching policy is defined at the input level it means inputs will consume messages and aggregate them until the batch is complete, at which point it is flushed downstream to your processors and subsequently your outputs.

Tune the batch parameters to suit the size (or time interval, etc) of window you require.

Grouping

Once our messages are batched we have one large but general window of messages. Depending on our use case we may wish to divide them into groups based on their contents. For that purpose we have two processor options: group_by and group_by_value.

For example, we can break our window out into groups based on the messages Kafka key:

pipeline:
processors:
- group_by_value:
value: ${! meta("kafka_key") }

Aggregating

The main purpose of windowing messages is so they can be aggregated into a single message that summarises the window. For this purpose we have lots of options within Benthos, but for this guide we'll cover a select few, where each example uses Bloblang.

Flat Counter

The easiest aggregation to perform is simply counting how many messages were within the window. This is easy to do with the bloblang processor using the batch_size function:

pipeline:
processors:
# TODO: Paste group processor here if you want it.
# Set the value of doc.count to the batch size.
- bloblang: |
root = this
doc.count = batch_size()
# Drop all documents except the first.
root = match {
batch_index() > 0 => deleted()
}

Real Counter

If you have a group of structured documents containing numeric values that you wish to count then that's also pretty easy with Bloblang. For brevity we're going to assume our messages are JSON documents of the format:

{"doc":{"count":5,"contents":"foobar"}}

And that we only wish to preserve the first message of the batch. We can do this by extracting the doc.count value of each document into an array with the method from_all and adding them with the method sum:

pipeline:
processors:
# TODO: Paste group processor here if you want it.
- bloblang: |
root = this
doc.count = json("doc.count").from_all().sum()
# Drop all documents except the first.
root = match {
batch_index() > 0 => deleted()
}

This results in a document containing our aggregated count, along with the rest of the first document of the batch:

{
"doc": {
"count": 243,
"contents": "foobar"
}
}

Custom Folding

Bloblang also has a method fold which allows you to write custom folding logic for your values. Here's an example where we implement a max function for our counts:

pipeline:
processors:
# TODO: Paste group processor here if you want it.
- bloblang: |
root = this
doc.max = json("doc.count").from_all().fold(0, match {
tally < value => value
_ => tally
})
# Drop all documents except the first.
root = match {
batch_index() > 0 => deleted()
}

Bloblang is very powerful, and by using from and from_all it's possible to perform a wide range of batch-wide processing.