Skip to main content

group_by_value

Splits a batch of messages into N batches, where each resulting batch contains a group of messages determined by a function interpolated string evaluated per message.

# Config fields, showing default values
label: ""
group_by_value:
value: ${! meta("kafka_key") } # No default (required)

This allows you to group messages using arbitrary fields within their content or metadata, process them individually, and send them to unique locations as per their group.

The functionality of this processor depends on being applied across messages that are batched. You can find out more about batching in this doc.

Fields

value

The interpolated string to group based on. This field supports interpolation functions.

Type: string

# Examples

value: ${! meta("kafka_key") }

value: ${! json("foo.bar") }-${! meta("baz") }

Examples

If we were consuming Kafka messages and needed to group them by their key, archive the groups, and send them to S3 with the key as part of the path we could achieve that with the following:

pipeline:
processors:
- group_by_value:
value: ${! meta("kafka_key") }
- archive:
format: tar
- compress:
algorithm: gzip
output:
aws_s3:
bucket: TODO
path: docs/${! meta("kafka_key") }/${! count("files") }-${! timestamp_unix_nano() }.tar.gz