Windowed Processing
There are many ways of performing windowed or aggregated message processing with the wide range of connectors and processors Benthos offers, but this usually relies on aggregating messages in transit with a cache or database.
Instead, this document outlines the simplest way of performing tumbling window processing in Benthos, which is to use input level batching. There are plans to eventually offer other windowing mechanisms such as hopping or sliding and these will behave similarly.
#
Creating BatchesFirstly, we will need to create batches of messages before our processing stages. This batch mechanism is what creates our window of messages, which we can later process as a group.
Some inputs natively support batching, otherwise we can wrap them within a broker:
- Kafka
- NATS (Broker)
NOTE: Batching here doesn't mean we have to output messages as a batch. After processing we can break this batch out and even re-batch with different settings if we want.
When a batching policy is defined at the input level it means inputs will consume messages and aggregate them until the batch is complete, at which point it is flushed downstream to your processors and subsequently your outputs.
Tune the batch parameters to suit the size (or time interval, etc) of window you require.
#
GroupingOnce our messages are batched we have one large but general window of messages. Depending on our use case we may wish to divide them into groups based on their contents. For that purpose we have two processor options: group_by
and group_by_value
.
For example, we can break our window out into groups based on the messages Kafka key:
#
AggregatingThe main purpose of windowing messages is so they can be aggregated into a single message that summarises the window. For this purpose we have lots of options within Benthos, but for this guide we'll cover a select few, where each example uses Bloblang.
#
Flat CounterThe easiest aggregation to perform is simply counting how many messages were within the window. This is easy to do with the bloblang
processor using the batch_size
function:
#
Real CounterIf you have a group of structured documents containing numeric values that you wish to count then that's also pretty easy with Bloblang. For brevity we're going to assume our messages are JSON documents of the format:
And that we only wish to preserve the first message of the batch. We can do this by extracting the doc.count
value of each document into an array with the method from_all
and adding them with the method sum
:
This results in a document containing our aggregated count, along with the rest of the first document of the batch:
#
Custom FoldingBloblang also has a method fold
which allows you to write custom folding logic for your values. Here's an example where we implement a max function for our counts:
Bloblang is very powerful, and by using from
and from_all
it's possible to perform a wide range of batch-wide processing.