There are many ways of performing windowed or aggregated message processing with the wide range of connectors and processors Benthos offers, but this usually relies on aggregating messages in transit with a cache or database.
Instead, this document outlines the simplest way of performing tumbling window processing in Benthos, which is to use input level batching. There are plans to eventually offer other windowing mechanisms such as hopping or sliding and these will behave similarly.
Firstly, we will need to create batches of messages before our processing stages. This batch mechanism is what creates our window of messages, which we can later process as a group.
- NATS (Broker)
NOTE: Batching here doesn't mean we have to output messages as a batch. After processing we can break this batch out and even re-batch with different settings if we want.
When a batching policy is defined at the input level it means inputs will consume messages and aggregate them until the batch is complete, at which point it is flushed downstream to your processors and subsequently your outputs.
Tune the batch parameters to suit the size (or time interval, etc) of window you require.
Once our messages are batched we have one large but general window of messages. Depending on our use case we may wish to divide them into groups based on their contents. For that purpose we have two processor options:
For example, we can break our window out into groups based on the messages Kafka key:
The main purpose of windowing messages is so they can be aggregated into a single message that summarises the window. For this purpose we have lots of options within Benthos, but for this guide we'll cover a select few, where each example uses Bloblang.
If you have a group of structured documents containing numeric values that you wish to count then that's also pretty easy with Bloblang. For brevity we're going to assume our messages are JSON documents of the format:
And that we only wish to preserve the first message of the batch. We can do this by extracting the
doc.count value of each document into an array with the method
from_all and adding them with the method
This results in a document containing our aggregated count, along with the rest of the first document of the batch:
Bloblang also has a method
fold which allows you to write custom folding logic for your values. Here's an example where we implement a max function for our counts: