This component is experimental and therefore subject to change or removal outside of major version releases.

Executes a query and creates a message for each document received.

Introduced in version 3.64.0.

# Common config fields, showing default values
label: ""
url: mongodb://localhost:27017 # No default (required)
database: "" # No default (required)
username: ""
password: ""
collection: "" # No default (required)
query: |2 # No default (required)
root.from = {"$lte": timestamp_unix()} = {"$gte": timestamp_unix()}
auto_replay_nacks: true
batch_size: 1000 # No default (optional)
sort: {} # No default (optional)
limit: 0 # No default (optional)

Once the documents from the query are exhausted, this input shuts down, allowing the pipeline to gracefully terminate (or the next input in a sequence to execute).



The URL of the target MongoDB server.

Type: string

# Examples

url: mongodb://localhost:27017


The name of the target MongoDB database.

Type: string


The username to connect to the database.

Type: string
Default: ""


The password to connect to the database.


This field contains sensitive information that usually shouldn't be added to a config directly, read our secrets page for more info.

Type: string
Default: ""


The collection to select from.

Type: string


The mongodb operation to perform.

Type: string
Default: "find"
Requires version 4.2.0 or newer
Options: find, aggregate.


The json_marshal_mode setting is optional and controls the format of the output message.

Type: string
Default: "canonical"
Requires version 4.7.0 or newer

canonicalA string format that emphasizes type preservation at the expense of readability and interoperability. That is, conversion from canonical to BSON will generally preserve type information except in certain specific cases.
relaxedA string format that emphasizes readability and interoperability at the expense of type preservation.That is, conversion from relaxed format to BSON can lose type information.


Bloblang expression describing MongoDB query.

Type: string

# Examples

query: |2
root.from = {"$lte": timestamp_unix()} = {"$gte": timestamp_unix()}


Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to false these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.

Type: bool
Default: true


A explicit number of documents to batch up before flushing them for processing. Must be greater than 0. Operations: find, aggregate

Type: int
Requires version 4.26.0 or newer

# Examples

batch_size: 1000


An object specifying fields to sort by, and the respective sort order (1 ascending, -1 descending). Note: The driver currently appears to support only one sorting key. Operations: find

Type: object
Requires version 4.26.0 or newer

# Examples

name: 1

age: -1


An explicit maximum number of documents to return. Operations: find

Type: int
Requires version 4.26.0 or newer