Skip to main content

dedupe

Deduplicates messages by storing a key value in a cache using the add operator. If the key already exists within the cache it is dropped.

# Config fields, showing default values
label: ""
dedupe:
cache: "" # No default (required)
key: ${! meta("kafka_key") } # No default (required)
drop_on_err: true

Caches must be configured as resources, for more information check out the cache documentation here.

When using this processor with an output target that might fail you should always wrap the output within an indefinite retry block. This ensures that during outages your messages aren't reprocessed after failures, which would result in messages being dropped.

Batch Deduplication

This processor enacts on individual messages only, in order to perform a deduplication on behalf of a batch (or window) of messages instead use the cache processor.

Delivery Guarantees

Performing deduplication on a stream using a distributed cache voids any at-least-once guarantees that it previously had. This is because the cache will preserve message signatures even if the message fails to leave the Benthos pipeline, which would cause message loss in the event of an outage at the output sink followed by a restart of the Benthos instance (or a server crash, etc).

This problem can be mitigated by using an in-memory cache and distributing messages to horizontally scaled Benthos pipelines partitioned by the deduplication key. However, in situations where at-least-once delivery guarantees are important it is worth avoiding deduplication in favour of implement idempotent behaviour at the edge of your stream pipelines.

Fields

cache

The cache resource to target with this processor.

Type: string

key

An interpolated string yielding the key to deduplicate by for each message. This field supports interpolation functions.

Type: string

# Examples

key: ${! meta("kafka_key") }

key: ${! content().hash("xxhash64") }

drop_on_err

Whether messages should be dropped when the cache returns a general error such as a network issue.

Type: bool
Default: true

Examples

The following configuration demonstrates a pipeline that deduplicates messages based on the Kafka key.

pipeline:
processors:
- dedupe:
cache: keycache
key: ${! meta("kafka_key") }

cache_resources:
- label: keycache
memory:
default_ttl: 60s