Skip to main content

azure_cosmosdb

EXPERIMENTAL

This component is experimental and therefore subject to change or removal outside of major version releases.

Executes a SQL query against Azure CosmosDB and creates a batch of messages from each page of items.

Introduced in version v4.25.0.

# Common config fields, showing default values
input:
label: ""
azure_cosmosdb:
endpoint: https://localhost:8081 # No default (optional)
account_key: '!!!SECRET_SCRUBBED!!!' # No default (optional)
connection_string: '!!!SECRET_SCRUBBED!!!' # No default (optional)
database: testdb # No default (required)
container: testcontainer # No default (required)
partition_keys_map: root = "blobfish" # No default (required)
query: SELECT c.foo FROM testcontainer AS c WHERE c.bar = "baz" AND c.timestamp < @timestamp # No default (required)
args_mapping: |- # No default (optional)
root = [
{ "Name": "@name", "Value": "benthos" },
]
auto_replay_nacks: true

Cross-partition Queries

Cross-partition queries are currently not supported by the underlying driver. For every query, the PartitionKey value(s) must be known in advance and specified in the config. See details here.

Credentials

You can use one of the following authentication mechanisms:

  • Set the endpoint field and the account_key field
  • Set only the endpoint field to use DefaultAzureCredential
  • Set the connection_string field

Metadata

This component adds the following metadata fields to each message:

- activity_id
- request_charge

You can access these metadata fields using function interpolation.

Examples

Execute a parametrized SQL query to select documents from a container.

input:
azure_cosmosdb:
endpoint: http://localhost:8080
account_key: C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==
database: blobbase
container: blobfish
partition_keys_map: root = "AbyssalPlain"
query: SELECT * FROM blobfish AS b WHERE b.species = @species
args_mapping: |
root = [
{ "Name": "@species", "Value": "smooth-head" },
]

Fields

endpoint

CosmosDB endpoint.

Type: string

# Examples

endpoint: https://localhost:8081

account_key

Account key.

Secret

This field contains sensitive information that usually shouldn't be added to a config directly, read our secrets page for more info.

Type: string

# Examples

account_key: C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==

connection_string

Connection string.

Secret

This field contains sensitive information that usually shouldn't be added to a config directly, read our secrets page for more info.

Type: string

# Examples

connection_string: AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;

database

Database.

Type: string

# Examples

database: testdb

container

Container.

Type: string

# Examples

container: testcontainer

partition_keys_map

A Bloblang mapping which should evaluate to a single partition key value or an array of partition key values of type string, integer or boolean. Currently, hierarchical partition keys are not supported so only one value may be provided.

Type: string

# Examples

partition_keys_map: root = "blobfish"

partition_keys_map: root = 41

partition_keys_map: root = true

partition_keys_map: root = null

partition_keys_map: root = now().ts_format("2006-01-02")

query

The query to execute

Type: string

# Examples

query: SELECT c.foo FROM testcontainer AS c WHERE c.bar = "baz" AND c.timestamp < @timestamp

args_mapping

A Bloblang mapping that, for each message, creates a list of arguments to use with the query.

Type: string

# Examples

args_mapping: |-
root = [
{ "Name": "@name", "Value": "benthos" },
]

batch_count

The maximum number of messages that should be accumulated into each batch. Use '-1' specify dynamic page size.

Type: int
Default: -1

auto_replay_nacks

Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to false these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.

Type: bool
Default: true

CosmosDB Emulator

If you wish to run the CosmosDB emulator that is referenced in the documentation here, the following Docker command should do the trick:

> docker run --rm -it -p 8081:8081 --name=cosmosdb -e AZURE_COSMOS_EMULATOR_PARTITION_COUNT=10 -e AZURE_COSMOS_EMULATOR_ENABLE_DATA_PERSISTENCE=false mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator

Note: AZURE_COSMOS_EMULATOR_PARTITION_COUNT controls the number of partitions that will be supported by the emulator. The bigger the value, the longer it takes for the container to start up.

Additionally, instead of installing the container self-signed certificate which is exposed via https://localhost:8081/_explorer/emulator.pem, you can run mitmproxy like so:

> mitmproxy -k --mode "reverse:https://localhost:8081"

Then you can access the CosmosDB UI via http://localhost:8080/_explorer/index.html and use http://localhost:8080 as the CosmosDB endpoint.