This cookbook demonstrates how to merge JSON events from parallel streams using content based rules and a cache of your choice.
The imaginary problem we are going to solve is hydrating a feed of article comments with information from their parent articles. We will be consuming and writing to Kafka, but the example works with any input and output combination.
Articles are received over the topic
articles and look like this:
Comments can either be posted on an article or a parent comment, are received over the topic
comments, and look like this:
Our goal is to end up with a single stream of comments, where information about the root article of the comment is attached to the event. The above comment should exit our pipeline looking like this:
In order to achieve this we will need to cache articles as they pass through our pipelines and then retrieve them for each comment passing through. Since the parent of a comment might be another comment we will also need to cache and retrieve comments in the same way.
Our first pipeline is very simple, we just consume articles, reduce them to only the fields we wish to cache, and then cache them. If we receive the same article multiple times we're going to assume it's okay to overwrite the old article in the cache.
In this example I'm targeting Redis, but you can choose any of the supported cache targets. The TTL of cached articles is set to one week.
Our second pipeline consumes comments, caches them in case a subsequent comment references them, obtains its parent (article or comment), and attaches the root article to the event before sending it to our output topic
In this config we make use of the
branch processor as it allows us to reduce documents into smaller maps for caching and gives us greater control over how results are mapped back into the document.
This pipeline satisfies our basic needs but errors aren't handled at all, meaning intermittent cache connectivity problems that span beyond our cache retries will result in failed documents entering our
comments_hydrated topic. This is also the case if a comment arrives in our pipeline before its parent.
There are many patterns for error handling to choose from in Benthos. In this example we're going to introduce a delayed retry queue as it enables us to reprocess failed documents after a grace period, which is isolated from our main pipeline.
Adding a Retry Queue
Our retry queue is going to be another topic called
comments_retried. Since most errors are related to time we will delay retry attempts by storing the current timestamp after a failed request as a metadata field.
We will use an input [
broker][input-broker] so that we can consume both the
comments_retry topics in the same pipeline.
Our config (omitting the caching sections for brevity) now looks like this:
With this config we can deploy as many instances of Benthos as we need.