A workflow is often expressed as a DAG of processing stages, where each stage can result in N possible next stages, until finally the flow ends at an exit node.
For example, if we had processing stages A, B, C and D, where stage A could result in either stage B or C being next, always followed by D, it might look something like this:
This flow would be easy to express in a standard Benthos config, we could simply
conditional processor to route to either B or C depending
on a condition on the result of A. However, this method of flow control quickly
becomes unfeasible as the DAG gets more complicated, imagine expressing this
flow using conditional or switch processors:
And imagine doing so knowing that the diagram is subject to change over time. Yikes!
Automatic DAG Resolution
This method allows you to build your workflow without having to explicitly define (or even know) the final order of stages. Instead, you define how each individual stage mutates the original document and Benthos automatically resolves the necessary execution order, including parallel execution of stages within the same dependency tier.
For more information regarding these processors please check their respective documentation.
Concepts like these are much easier to explain with a simple example, so let's define our workflow stages.
We are starting off with arbitrary JSON documents and want to make some HTTP
requests to a series of enrichment services where the end result will be placed
within the original JSON document at the path
The enrichment services, depending on the contents of the original payload, will return content that might need to be fed into a subsequent enrichment service in order to obtain the final result we are looking for.
Some of these services might fail due to the contents of the original message, in which case we want to send the full contents to a recovery service that will attempt to recover a fallback version of the goal enrichment.
The enrichment stages can be described as:
A) Perform an HTTP request to
fooserve with the full message payload as the
contents. The response can be one of the following:
- 400: Bad request
B) Perform an HTTP request to
barserve with the object
bar taken from the
output of stage A response 1. The response can be one of the following:
- 400: Bad request
C) Perform an HTTP request to
bazserve with the object
baz taken either from
the output of stage A response 2 or stage B response 1. The response can be one
of the following:
- 400: Bad request
D) If any previous stage returns a 400 then we perform an HTTP request to
recoverserve with the full message payload as the contents. The response will
always be the following:
E) Place the final contents (the object at
qux) in the document at
A diagram for this flow might look like this:
For simplicity we will attempt each HTTP request three times and interpret any
failure after those attempts as the equivalent of a
400 status code.
There's no need for a premap since we are sending the entire original payload.
The postmap targets are either the object
baz, and both are optional
since we aren't sure which will be present.
The premap for stage B is
tmp.enrichments.bar which if not present in the
payload results in this stage being skipped. Stage A is the only stage that
tmp.enrichments.bar and Benthos will make sure stage A is run before
stage B. If we were to add more stages later on that might provide
tmp.enrichments.bar then they will also automatically be run before this
The premap for stage C is
tmp.enrichments.baz which if not present in the
payload results in this stage being skipped. Since either stage A or A might
provide this target they will both be run before this stage.
Stage D is unique since for this stage we need to send the entire contents of
the payload (which means no premap), but we still only wish to trigger this
stage once any other stage capable of providing
already been executed. Therefore, we add the field
tmp.enrichments.qux as an explicit dependency.
We also only wish to execute this stage when
tmp.enrichments.qux is missing
from the payload, therefore we add a condition that performs this check.
Stage E doesn't need writing explicitly since by the end of Stage D we should
already have our enrichment at
The final Benthos configuration might look something like this:
Workflow stages can fail if any mandatory post mappings are unresolvable with result payloads, or if a processor within the stage fails. When these failures occur there are many mechanisms within Benthos that allow you to capture and/or recover from them outlined in this document.