snowflake_put
EXPERIMENTAL
This component is experimental and therefore subject to change or removal outside of major version releases.
Sends messages to Snowflake stages and, optionally, calls Snowpipe to load this data into one or more tables.
- Common
- Advanced
# Common config fields, showing default valuesoutput:label: ""snowflake_put:account: ""region: ""cloud: ""user: ""password: ""private_key_file: ""private_key_pass: ""role: ""database: ""warehouse: ""schema: ""stage: ""path: ""compression: AUTOsnowpipe: ""batching:count: 0byte_size: 0period: ""check: ""max_in_flight: 1
# All config fields, showing default valuesoutput:label: ""snowflake_put:account: ""region: ""cloud: ""user: ""password: ""private_key_file: ""private_key_pass: ""role: ""database: ""warehouse: ""schema: ""stage: ""path: ""upload_parallel_threads: 4compression: AUTOsnowpipe: ""batching:count: 0byte_size: 0period: ""check: ""processors: []max_in_flight: 1
In order to use a different stage and / or Snowpipe for each message, you can use function interpolations as described
here. When using batching, messages are grouped by the calculated
stage and Snowpipe and are streamed to individual files in their corresponding stage and, optionally, a Snowpipe
insertFiles
REST API call will be made for each individual file.
Credentials​
Two authentication mechanisms are supported:
- User/password
- Key Pair Authentication
User/password​
This is a basic authentication mechanism which allows you to PUT data into a stage. However, it is not compatible with Snowpipe.
Key Pair Authentication​
This authentication mechanism allows Snowpipe functionality, but it does require configuring an SSH Private Key beforehand. Please consult the documentation for details on how to set it up and assign the Public Key to your user.
Note that the Snowflake documentation suggests using this command:
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
to generate an encrypted SSH private key. However, in this case, it uses an encryption algorithm called
pbeWithMD5AndDES-CBC
, which part of the PKCS#5 v1.5, which is considered insecure. Due to this, Benthos does not
support it and, if you wish to use password-protected keys directly, you must use PKCS#5 v2.0 to encrypt them. One way
of achieving this is to use the following command:
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8
Alternatively, you can re-encrypt an existing key using this command:
openssl pkcs8 -in rsa_key_original.p8 -topk8 -v2 des3 -out rsa_key.p8
Please consult this documentation for details.
Batching​
It's common to want to upload messages to Snowflake as batched archives. The easiest way to do this is to batch your
messages at the output level and join the batch of messages with an
archive
and/or compress
processor.
For the optimal batch size, please consult the Snowflake documentation.
Snowpipe​
Given a table called BENTHOS_TBL
with one column of type variant
:
CREATE OR REPLACE TABLE BENTHOS_DB.PUBLIC.BENTHOS_TBL(RECORD variant)
and the following BENTHOS_PIPE
Snowpipe:
CREATE OR REPLACE PIPE BENTHOS_DB.PUBLIC.BENTHOS_PIPE AUTO_INGEST = FALSE AS COPY INTO BENTHOS_DB.PUBLIC.BENTHOS_TBL FROM (SELECT * FROM @%BENTHOS_TBL) FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO)
you can configure Benthos to use the implicit table stage @%BENTHOS_TBL
as the stage
and
BENTHOS_PIPE
as the snowpipe
. In this case, you must set compression
to AUTO
and, if
using message batching, you'll need to configure an archive
processor
with the concatenate
format. Since the compression
is set to AUTO
, the
gosnowflake client library will compress the messages automatically so you
don't need to add a compress
processor for message batches.
If you add STRIP_OUTER_ARRAY = TRUE
in your Snowpipe FILE_FORMAT
definition, then you must use json_array
instead of concatenate
as the archive processor format.
Note: Only Snowpipes with FILE_FORMAT
TYPE
JSON
are currently supported.
Snowpipe Troubleshooting​
Snowpipe provides the insertReport
and loadHistoryScan
REST API endpoints which can be used to get information about recent Snowpipe calls. In
order to query them, you'll first need to generate a valid JWT token for your Snowflake account. There are two methods
for doing so:
- Using the
snowsql
utility:
snowsql --private-key-path rsa_key.p8 --generate-jwt -a <account> -u <user>
- Using the Python
jwt-generator
utility:
python3 jwt-generator.py --private_key_file_path=rsa_key.p8 --account=<account> --user=<user>
Once you successfully generate a JWT token and store it into the JWT_TOKEN
environment variable, then you can,
for example, query the insertReport
endpoint using curl
:
curl -H "Authorization: Bearer ${JWT_TOKEN}" "https://<account>.snowflakecomputing.com/v1/data/pipes/<database>.<schema>.<snowpipe>/insertReport"
If you need to pass in a valid requestId
to any of these Snowpipe REST API endpoints, you can enable debug
logging as described here and Benthos will print the RequestIDs that it sends to
Snowpipe. They match the name of the file that is placed in the stage.
Performance​
This output benefits from sending multiple messages in flight in parallel for
improved performance. You can tune the max number of in flight messages with the
field max_in_flight
.
This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.
Examples​
- No compression
- Automatic compression
- DEFLATE compression
- RAW_DEFLATE compression
Upload concatenated messages into a .json file to a table stage without calling Snowpipe.
output:snowflake_put:account: benthosuser: test@benthos.devprivate_key_file: path_to_ssh_key.pemrole: ACCOUNTADMINdatabase: BENTHOS_DBwarehouse: COMPUTE_WHschema: PUBLICpath: benthosstage: "@%BENTHOS_TBL"upload_parallel_threads: 4compression: NONEbatching:count: 10period: 3sprocessors:- archive:format: concatenate
Upload concatenated messages compressed automatically into a .gz archive file to a table stage without calling Snowpipe.
output:snowflake_put:account: benthosuser: test@benthos.devprivate_key_file: path_to_ssh_key.pemrole: ACCOUNTADMINdatabase: BENTHOS_DBwarehouse: COMPUTE_WHschema: PUBLICpath: benthosstage: "@%BENTHOS_TBL"upload_parallel_threads: 4compression: AUTObatching:count: 10period: 3sprocessors:- archive:format: concatenate
Upload concatenated messages compressed into a .deflate archive file to a table stage and call Snowpipe to load them into a table.
output:snowflake_put:account: benthosuser: test@benthos.devprivate_key_file: path_to_ssh_key.pemrole: ACCOUNTADMINdatabase: BENTHOS_DBwarehouse: COMPUTE_WHschema: PUBLICpath: benthosstage: "@%BENTHOS_TBL"upload_parallel_threads: 4compression: DEFLATEsnowpipe: BENTHOS_PIPEbatching:count: 10period: 3sprocessors:- archive:format: concatenate- compress:algorithm: zlib
Upload concatenated messages compressed into a .rawdeflate archive file to a table stage and call Snowpipe to load them into a table.
output:snowflake_put:account: benthosuser: test@benthos.devprivate_key_file: path_to_ssh_key.pemrole: ACCOUNTADMINdatabase: BENTHOS_DBwarehouse: COMPUTE_WHschema: PUBLICpath: benthosstage: "@%BENTHOS_TBL"upload_parallel_threads: 4compression: RAW_DEFLATEsnowpipe: BENTHOS_PIPEbatching:count: 10period: 3sprocessors:- archive:format: concatenate- compress:algorithm: flate
Fields​
account
​
Account name, which is the same as the Account Identifier
as described here.
However, when using an Account Locator,
the Account Identifier is formatted as <account_locator>.<region_id>.<cloud>
and this field needs to be
populated using the <account_locator>
part.
Type: string
region
​
Optional region field which needs to be populated when using
an Account Locator
and it must be set to the <region_id>
part of the Account Identifier
(<account_locator>.<region_id>.<cloud>
).
Type: string
# Examplesregion: us-west-2
cloud
​
Optional cloud platform field which needs to be populated
when using an Account Locator
and it must be set to the <cloud>
part of the Account Identifier
(<account_locator>.<region_id>.<cloud>
).
Type: string
# Examplescloud: awscloud: gcpcloud: azure
user
​
Username.
Type: string
password
​
An optional password.
Type: string
private_key_file
​
The path to a file containing the private SSH key.
Type: string
private_key_pass
​
An optional private SSH key passphrase.
Type: string
role
​
Role.
Type: string
database
​
Database.
Type: string
warehouse
​
Warehouse.
Type: string
schema
​
Schema.
Type: string
stage
​
Stage name. Use either one of the supported stage types. This field supports interpolation functions.
Type: string
path
​
Stage path.
Type: string
upload_parallel_threads
​
Specifies the number of threads to use for uploading files.
Type: int
Default: 4
compression
​
Compression type.
Type: string
Default: "AUTO"
Option | Summary |
---|---|
AUTO | Compression (gzip) is applied automatically by the output and messages must contain plain-text JSON. |
DEFLATE | Messages must be pre-compressed using the zlib algorithm (with zlib header, RFC1950). |
GZIP | Messages must be pre-compressed using the gzip algorithm. |
NONE | No compression is applied and messages must contain plain-text JSON. |
RAW_DEFLATE | Messages must be pre-compressed using the flate algorithm (without header, RFC1951). |
snowpipe
​
An optional Snowpipe name. Use the <snowpipe>
part from <database>.<schema>.<snowpipe>
.
This field supports interpolation functions.
Type: string
batching
​
Allows you to configure a batching policy.
Type: object
# Examplesbatching:byte_size: 5000count: 0period: 1sbatching:count: 10period: 1sbatching:check: this.contains("END BATCH")count: 0period: 1m
batching.count
​
A number of messages at which the batch should be flushed. If 0
disables count based batching.
Type: int
Default: 0
batching.byte_size
​
An amount of bytes at which the batch should be flushed. If 0
disables size based batching.
Type: int
Default: 0
batching.period
​
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examplesperiod: 1speriod: 1mperiod: 500ms
batching.check
​
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examplescheck: this.type == "end_of_transaction"
batching.processors
​
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
# Examplesprocessors:- archive:format: concatenateprocessors:- archive:format: linesprocessors:- archive:format: json_array
max_in_flight
​
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Type: int
Default: 1