memory

Stores consumed messages in memory and acknowledges them at the input level. During shutdown Benthos will make a best attempt at flushing all remaining messages before exiting cleanly.

# Common config fields, showing default values
buffer:
memory:
limit: 524288000
batch_policy:
enabled: false
count: 0
byte_size: 0
period: ""

This buffer is appropriate when consuming messages from inputs that do not gracefully handle back pressure and where delivery guarantees aren't critical.

This buffer has a configurable limit, where consumption will be stopped with back pressure upstream if the total size of messages in the buffer reaches this amount. Since this calculation is only an estimate, and the real size of messages in RAM is always higher, it is recommended to set the limit significantly below the amount of RAM available.

Batching

It is possible to batch up messages sent from this buffer using a batch policy.

Fields

limit

The maximum buffer size (in bytes) to allow before applying backpressure upstream.

Type: number
Default: 524288000

batch_policy

Optionally configure a policy to flush buffered messages in batches.

Type: object
Default: {"byte_size":0,"condition":{"static":false,"type":"static"},"count":0,"enabled":false,"period":"","processors":[]}

batch_policy.enabled

Whether to batch messages as they are flushed.

Type: bool
Default: false

batch_policy.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: number
Default: 0

batch_policy.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: number
Default: 0

batch_policy.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batch_policy.condition

A condition to test against each message entering the batch, if this condition resolves to true then the batch is flushed.

Type: object
Default: {"static":false,"type":"static"}

batch_policy.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array
Default: []

# Examples
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
processors:
- merge_json: {}