Bloblang Functions

Functions can be placed anywhere and allow you to extract information from your environment, generate values, or access data from the underlying message being mapped:

root.doc.id = uuid_v4()
root.doc.received_at = now()
root.doc.host = hostname()

General#

count#

The count function is a counter starting at 1 which increments after each time it is called. Count takes an argument which is an identifier for the counter, allowing you to specify multiple unique counters in your configuration.

root = this
root.id = count("bloblang_function_example")
# In: {"message":"foo"}
# Out: {"id":1,"message":"foo"}
# In: {"message":"bar"}
# Out: {"id":2,"message":"bar"}

deleted#

A function that returns a result indicating that the mapping target should be deleted.

root = this
root.bar = deleted()
# In: {"bar":"bar_value","baz":"baz_value","foo":"foo value"}
# Out: {"baz":"baz_value","foo":"foo value"}

Since the result is a value it can be used to do things like remove elements of an array within map_each.

root.new_nums = this.nums.map_each(
match this {
this < 10 => deleted()
_ => this - 10
}
)
# In: {"nums":[3,11,4,17]}
# Out: {"new_nums":[1,7]}

range#

The range function creates an array of integers following a range between a start, stop and optional step integer argument. If the step argument is omitted then it defaults to 1. A negative step can be provided as long as stop < start.

root.a = range(0, 10)
root.b = range(0, this.max, 2)
root.c = range(0, -this.max, -2)
# In: {"max":10}
# Out: {"a":[0,1,2,3,4,5,6,7,8,9],"b":[0,2,4,6,8],"c":[0,-2,-4,-6,-8]}

throw#

Throws an error similar to a regular mapping error. This is useful for abandoning a mapping entirely given certain conditions.

root.doc.type = match {
this.exists("header.id") => "foo"
this.exists("body.data") => "bar"
_ => throw("unknown type")
}
root.doc.contents = (this.body.content | this.thing.body)
# In: {"header":{"id":"first"},"thing":{"body":"hello world"}}
# Out: {"doc":{"contents":"hello world","type":"foo"}}
# In: {"nothing":"matches"}
# Out: Error("failed to execute mapping query at line 1: unknown type")

uuid_v4#

Generates a new RFC-4122 UUID each time it is invoked and prints a string representation.

root.id = uuid_v4()

random_int#

Generates a non-negative pseudo-random 64-bit integer. An optional integer argument can be provided in order to seed the random number generator.

root.first = random_int()
root.second = random_int(1)

Message Info#

batch_index#

Returns the index of the mapped message within a batch. This is useful for applying maps only on certain messages of a batch.

root = if batch_index() > 0 { deleted() }

batch_size#

Returns the size of the message batch.

root.foo = batch_size()

content#

Returns the full raw contents of the mapping target message as a byte array. When mapping to a JSON field the value should be encoded using the method encode, or cast to a string directly using the method string, otherwise it will be base64 encoded by default.

root.doc = content().string()
# In: {"foo":"bar"}
# Out: {"doc":"{\"foo\":\"bar\"}"}

error#

If an error has occurred during the processing of a message this function returns the reported cause of the error. For more information about error handling patterns read here.

root.doc.error = error()

errored#

Returns a boolean value indicating whether an error has occurred during the processing of a message. For more information about error handling patterns read here.

root.doc.status = if errored() { 400 } else { 200 }

json#

Returns the value of a field within a JSON message located by a dot path argument. This function always targets the entire source JSON document regardless of the mapping context.

root.mapped = json("foo.bar")
# In: {"foo":{"bar":"hello world"}}
# Out: {"mapped":"hello world"}

The path argument is optional and if omitted the entire JSON payload is returned.

root.doc = json()
# In: {"foo":{"bar":"hello world"}}
# Out: {"doc":{"foo":{"bar":"hello world"}}}

meta#

Returns the value of a metadata key from the input message. Since values are extracted from the read-only input message they do NOT reflect changes made from within the map. If you wish to store the results of queries to be reused within the same mapping then instead use variables.

root.topic = meta("kafka_topic")

The parameter is optional and if omitted the entire metadata contents are returned as a JSON object.

root.all_metadata = meta()

Environment#

env#

Returns the value of an environment variable.

root.thing.key = env("key")

file#

BETA: This function is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with it is found.

Reads a file and returns its contents. Relative paths are resolved from the directory of the process executing the mapping.

root.doc = file(env("BENTHOS_TEST_BLOBLANG_FILE")).parse_json()
# In: {}
# Out: {"doc":{"foo":"bar"}}

hostname#

Returns a string matching the hostname of the machine running Benthos.

root.thing.host = hostname()

now#

Returns the current timestamp as a string in ISO 8601 format with the local timezone. Use the method format_timestamp in order to change the format and timezone.

root.received_at = now()
root.received_at = now().format_timestamp("Mon Jan 2 15:04:05 -0700 MST 2006", "UTC")

timestamp_unix#

Returns the current unix timestamp in seconds.

root.received_at = timestamp_unix()

timestamp_unix_nano#

Returns the current unix timestamp in nanoseconds.

root.received_at = timestamp_unix_nano()

Deprecated#

timestamp#

Returns the current time in a custom format specified by the argument. The format is defined by showing how the reference time, defined to be Mon Jan 2 15:04:05 -0700 MST 2006 would be displayed if it were the value.

A fractional second is represented by adding a period and zeros to the end of the seconds section of layout string, as in 15:04:05.000 to format a time stamp with millisecond precision. This has been deprecated in favour of the new now function.

root.received_at = timestamp("15:04:05")

timestamp_utc#

The equivalent of timestamp except the time is printed as UTC instead of the local timezone. This has been deprecated in favour of the new now function.

root.received_at = timestamp_utc("15:04:05")