aws_s3

EXPERIMENTAL: This component is experimental and therefore subject to change or removal outside of major version releases.

This input is a refactor of the current stable (and shorter named) s3 input which is still the recommended one to use until this input is considered stable. However, this input has improved capabilities and will eventually replace it.

# Common config fields, showing default values
input:
aws_s3:
bucket: ""
prefix: ""
region: eu-west-1
codec: all-bytes
sqs:
url: ""
key_path: Records.*.s3.object.key
bucket_path: Records.*.s3.bucket.name
envelope_path: ""

Downloads objects within an S3 bucket, optionally filtered by a prefix. If an SQS queue has been configured then only object keys read from the queue will be downloaded.

If an SQS queue is not specified the entire list of objects found when this input starts will be consumed.

Downloading Objects on Upload with SQS

A common pattern for consuming S3 objects is to emit upload notification events from the bucket either directly to an SQS queue, or to an SNS topic that is consumed by an SQS queue, and then have your consumer listen for events which prompt it to download the newly uploaded objects. More information about this pattern and how to set it up can be found at: https://docs.aws.amazon.com/AmazonS3/latest/dev/ways-to-add-notification-config-to-bucket.html.

Benthos is able to follow this pattern when you configure an sqs.url, where it consumes events from SQS and only downloads object keys received within those events. In order for this to work Benthos needs to know where within the event the key and bucket names can be found, specified as dot paths with the fields sqs.key_path and sqs.bucket_path. The default values for these fields should already be correct when following the guide above.

If your notification events are being routed to SQS via an SNS topic then the events will be enveloped by SNS, in which case you also need to specify the field sqs.envelope_path, which in the case of SNS to SQS will usually be Message.

When using SQS please make sure you have sensible values for sqs.max_messages and also the visibility timeout of the queue itself. When Benthos consumes an S3 object the SQS message that triggered it is not deleted until the S3 object has been sent onwards. This ensures at-least-once crash resiliency, but also means that if the S3 object takes longer to process than the visibility timeout of your queue then the same objects might be processed multiple times.

Downloading Large Files

When downloading large files it's often necessary to process it in streamed parts in order to avoid loading the entire file in memory at a given time. In order to do this a codec can be specified that determines how to break the input into smaller individual messages.

Credentials

By default Benthos will use a shared credentials file when connecting to AWS services. It's also possible to set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.

Metadata

This input adds the following metadata fields to each message:

- s3_key
- s3_bucket
- s3_last_modified_unix
- s3_last_modified (RFC3339)
- s3_content_type
- s3_content_encoding
- All user defined metadata

You can access these metadata fields using function interpolation.

Fields

bucket

The bucket to consume from. If the field sqs.url is specified this field is optional.

Type: string
Default: ""

prefix

An optional path prefix, if set only objects with the prefix are consumed.

Type: string
Default: ""

region

The AWS region to target.

Type: string
Default: "eu-west-1"

endpoint

Allows you to specify a custom endpoint for the AWS API.

Type: string
Default: ""

credentials

Optional manual configuration of AWS credentials to use. More information can be found in this document.

Type: object

credentials.profile

A profile from ~/.aws/credentials to use.

Type: string
Default: ""

credentials.id

The ID of credentials to use.

Type: string
Default: ""

credentials.secret

The secret for the credentials being used.

Type: string
Default: ""

credentials.token

The token for the credentials being used, required when using short term credentials.

Type: string
Default: ""

credentials.role

A role ARN to assume.

Type: string
Default: ""

credentials.role_external_id

An external ID to provide when assuming a role.

Type: string
Default: ""

force_path_style_urls

Forces the client API to use path style URLs for downloading keys, which is often required when connecting to custom endpoints.

Type: bool
Default: false

delete_objects

Whether to delete downloaded objects from the bucket once they are processed.

Type: bool
Default: false

codec

The way in which the bytes of consumed files are converted into messages, codecs are useful for specifying how large files might be processed in small chunks rather than loading it all in memory. It's possible to consume lines using a custom delimiter with the delim:x codec, where x is the character sequence custom delimiter.

Type: string
Default: "all-bytes"

OptionSummary
autoEXPERIMENTAL: Attempts to derive a codec for each file based on information such as the extension. For example, a .tar.gz file would be consumed with the tar-gzip codec. Defaults to all-bytes.
all-bytesConsume the entire file as a single binary message.
csvConsume structured rows as comma separated values, the first row must be a header row.
csv-gzipConsume structured rows as comma separated values from a gzip compressed file, the first row must be a header row.
delim:xConsume the file in segments divided by a custom delimter.
linesConsume the file in segments divided by linebreaks.
tarParse the file as a tar archive, and consume each file of the archive as a message.
tar-gzipParse the file as a gzip compressed tar archive, and consume each file of the archive as a message.
# Examples
codec: lines
codec: "delim:\t"
codec: delim:foobar

sqs

Consume SQS messages in order to trigger key downloads.

Type: object

sqs.url

An optional SQS URL to connect to. When specified this queue will control which objects are downloaded.

Type: string
Default: ""

sqs.endpoint

A custom endpoint to use when connecting to SQS.

Type: string
Default: ""

sqs.key_path

A dot path whereby object keys are found in SQS messages.

Type: string
Default: "Records.*.s3.object.key"

sqs.bucket_path

A dot path whereby the bucket name can be found in SQS messages.

Type: string
Default: "Records.*.s3.bucket.name"

sqs.envelope_path

A dot path of a field to extract an enveloped JSON payload for further extracting the key and bucket from SQS messages. This is specifically useful when subscribing an SQS queue to an SNS topic that receives bucket events.

Type: string
Default: ""

# Examples
envelope_path: Message

sqs.max_messages

The maximum number of SQS messages to consume from each request.

Type: number
Default: 10