= snowflake_put :type: output :status: beta :categories: ["Services"] //// THIS FILE IS AUTOGENERATED! To make changes, edit the corresponding source file under: https://github.com/redpanda-data/connect/tree/main/internal/impl/. And: https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl //// // © 2024 Redpanda Data Inc. component_type_dropdown::[] Sends messages to Snowflake stages and, optionally, calls Snowpipe to load this data into one or more tables. Introduced in version 4.0.0. [tabs] ====== Common:: + -- ```yml # Common config fields, showing default values output: label: "" snowflake_put: account: "" # No default (required) region: us-west-2 # No default (optional) cloud: aws # No default (optional) user: "" # No default (required) password: "" # No default (optional) private_key: "" # No default (optional) private_key_file: "" # No default (optional) private_key_pass: "" # No default (optional) role: "" # No default (required) database: "" # No default (required) warehouse: "" # No default (required) schema: "" # No default (required) stage: "" # No default (required) path: "" file_name: "" file_extension: "" compression: AUTO request_id: "" snowpipe: "" # No default (optional) batching: count: 0 byte_size: 0 period: "" check: "" max_in_flight: 1 ``` -- Advanced:: + -- ```yml # All config fields, showing default values output: label: "" snowflake_put: account: "" # No default (required) region: us-west-2 # No default (optional) cloud: aws # No default (optional) user: "" # No default (required) password: "" # No default (optional) private_key: "" # No default (optional) private_key_file: "" # No default (optional) private_key_pass: "" # No default (optional) role: "" # No default (required) database: "" # No default (required) warehouse: "" # No default (required) schema: "" # No default (required) stage: "" # No default (required) path: "" file_name: "" file_extension: "" upload_parallel_threads: 4 compression: AUTO request_id: "" snowpipe: "" # No default (optional) client_session_keep_alive: false batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) max_in_flight: 1 ``` -- ====== In order to use a different stage and / or Snowpipe for each message, you can use function interpolations as described in xref:configuration:interpolation.adoc#bloblang-queries[Bloblang queries]. When using batching, messages are grouped by the calculated stage and Snowpipe and are streamed to individual files in their corresponding stage and, optionally, a Snowpipe `insertFiles` REST API call will be made for each individual file. == Credentials Two authentication mechanisms are supported: - User/password - Key Pair Authentication === User/password This is a basic authentication mechanism which allows you to PUT data into a stage. However, it is not compatible with Snowpipe. === Key pair authentication This authentication mechanism allows Snowpipe functionality, but it does require configuring an SSH Private Key beforehand. Please consult the https://docs.snowflake.com/en/user-guide/key-pair-auth.html#configuring-key-pair-authentication[documentation^] for details on how to set it up and assign the Public Key to your user. Note that the Snowflake documentation https://twitter.com/felipehoffa/status/1560811785606684672[used to suggest^] using this command: ```bash openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 ``` to generate an encrypted SSH private key. However, in this case, it uses an encryption algorithm called `pbeWithMD5AndDES-CBC`, which is part of the PKCS#5 v1.5 and is considered insecure. Due to this, Benthos does not support it and, if you wish to use password-protected keys directly, you must use PKCS#5 v2.0 to encrypt them by using the following command (as the current Snowflake docs suggest): ```bash openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8 ``` If you have an existing key encrypted with PKCS#5 v1.5, you can re-encrypt it with PKCS#5 v2.0 using this command: ```bash openssl pkcs8 -in rsa_key_original.p8 -topk8 -v2 des3 -out rsa_key.p8 ``` Please consult the https://linux.die.net/man/1/pkcs8[pkcs8 command documentation^] for details on PKCS#5 algorithms. == Batching It's common to want to upload messages to Snowflake as batched archives. The easiest way to do this is to batch your messages at the output level and join the batch of messages with an xref:components:processors/archive.adoc[`archive`] and/or xref:components:processors/compress.adoc[`compress`] processor. For the optimal batch size, please consult the Snowflake https://docs.snowflake.com/en/user-guide/data-load-considerations-prepare.html[documentation^]. == Snowpipe Given a table called `BENTHOS_TBL` with one column of type `variant`: ```sql CREATE OR REPLACE TABLE BENTHOS_DB.PUBLIC.BENTHOS_TBL(RECORD variant) ``` and the following `BENTHOS_PIPE` Snowpipe: ```sql CREATE OR REPLACE PIPE BENTHOS_DB.PUBLIC.BENTHOS_PIPE AUTO_INGEST = FALSE AS COPY INTO BENTHOS_DB.PUBLIC.BENTHOS_TBL FROM (SELECT * FROM @%BENTHOS_TBL) FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO) ``` you can configure Benthos to use the implicit table stage `@%BENTHOS_TBL` as the `stage` and `BENTHOS_PIPE` as the `snowpipe`. In this case, you must set `compression` to `AUTO` and, if using message batching, you'll need to configure an xref:components:processors/archive.adoc[`archive`] processor with the `concatenate` format. Since the `compression` is set to `AUTO`, the https://github.com/snowflakedb/gosnowflake[gosnowflake^] client library will compress the messages automatically so you don't need to add a xref:components:processors/compress.adoc[`compress`] processor for message batches. If you add `STRIP_OUTER_ARRAY = TRUE` in your Snowpipe `FILE_FORMAT` definition, then you must use `json_array` instead of `concatenate` as the archive processor format. NOTE: Only Snowpipes with `FILE_FORMAT` `TYPE` `JSON` are currently supported. == Snowpipe troubleshooting Snowpipe https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-apis.html[provides^] the `insertReport` and `loadHistoryScan` REST API endpoints which can be used to get information about recent Snowpipe calls. In order to query them, you'll first need to generate a valid JWT token for your Snowflake account. There are two methods for doing so: - Using the `snowsql` https://docs.snowflake.com/en/user-guide/snowsql.html[utility^]: ```bash snowsql --private-key-path rsa_key.p8 --generate-jwt -a -u ``` - Using the Python `sql-api-generate-jwt` https://docs.snowflake.com/en/developer-guide/sql-api/authenticating.html#generating-a-jwt-in-python[utility^]: ```bash python3 sql-api-generate-jwt.py --private_key_file_path=rsa_key.p8 --account= --user= ``` Once you successfully generate a JWT token and store it into the `JWT_TOKEN` environment variable, then you can, for example, query the `insertReport` endpoint using `curl`: ```bash curl -H "Authorization: Bearer ${JWT_TOKEN}" "https://.snowflakecomputing.com/v1/data/pipes/../insertReport" ``` If you need to pass in a valid `requestId` to any of these Snowpipe REST API endpoints, you can set a xref:guides:bloblang/functions.adoc#uuid_v4[uuid_v4()] string in a metadata field called `request_id`, log it via the xref:components:processors/log.adoc[`log`] processor and then configure `request_id: ${ @request_id }` ). Alternatively, you can xref:components:logger/about.adoc[enable debug logging] and Benthos will print the Request IDs that it sends to Snowpipe. == General troubleshooting The underlying https://github.com/snowflakedb/gosnowflake[`gosnowflake` driver^] requires write access to the default directory to use for temporary files. Please consult the https://pkg.go.dev/os#TempDir[`os.TempDir`^] docs for details on how to change this directory via environment variables. A silent failure can occur due to https://github.com/snowflakedb/gosnowflake/issues/701[this issue^], where the underlying https://github.com/snowflakedb/gosnowflake[`gosnowflake` driver^] doesn't return an error and doesn't log a failure if it can't figure out the current username. One way to trigger this behavior is by running Benthos in a Docker container with a non-existent user ID (such as `--user 1000:1000`). == Performance This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field `max_in_flight`. This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more xref:configuration:batching.adoc[in this doc]. == Examples [tabs] ====== Kafka / realtime brokers:: + -- Upload message batches from realtime brokers such as Kafka persisting the batch partition and offsets in the stage path and filename similarly to the https://docs.snowflake.com/en/user-guide/kafka-connector-ts.html#step-1-view-the-copy-history-for-the-table[Kafka Connector scheme^] and call Snowpipe to load them into a table. When batching is configured at the input level, it is done per-partition. ```yaml input: kafka: addresses: - localhost:9092 topics: - foo consumer_group: benthos batching: count: 10 period: 3s processors: - mapping: | meta kafka_start_offset = meta("kafka_offset").from(0) meta kafka_end_offset = meta("kafka_offset").from(-1) meta batch_timestamp = if batch_index() == 0 { now() } - mapping: | meta batch_timestamp = if batch_index() != 0 { meta("batch_timestamp").from(0) } output: snowflake_put: account: benthos user: test@benthos.dev private_key_file: path_to_ssh_key.pem role: ACCOUNTADMIN database: BENTHOS_DB warehouse: COMPUTE_WH schema: PUBLIC stage: "@%BENTHOS_TBL" path: benthos/BENTHOS_TBL/${! @kafka_partition } file_name: ${! @kafka_start_offset }_${! @kafka_end_offset }_${! meta("batch_timestamp") } upload_parallel_threads: 4 compression: NONE snowpipe: BENTHOS_PIPE ``` -- No compression:: + -- Upload concatenated messages into a `.json` file to a table stage without calling Snowpipe. ```yaml output: snowflake_put: account: benthos user: test@benthos.dev private_key_file: path_to_ssh_key.pem role: ACCOUNTADMIN database: BENTHOS_DB warehouse: COMPUTE_WH schema: PUBLIC stage: "@%BENTHOS_TBL" path: benthos upload_parallel_threads: 4 compression: NONE batching: count: 10 period: 3s processors: - archive: format: concatenate ``` -- Parquet format with snappy compression:: + -- Upload concatenated messages into a `.parquet` file to a table stage without calling Snowpipe. ```yaml output: snowflake_put: account: benthos user: test@benthos.dev private_key_file: path_to_ssh_key.pem role: ACCOUNTADMIN database: BENTHOS_DB warehouse: COMPUTE_WH schema: PUBLIC stage: "@%BENTHOS_TBL" path: benthos file_extension: parquet upload_parallel_threads: 4 compression: NONE batching: count: 10 period: 3s processors: - parquet_encode: schema: - name: ID type: INT64 - name: CONTENT type: BYTE_ARRAY default_compression: snappy ``` -- Automatic compression:: + -- Upload concatenated messages compressed automatically into a `.gz` archive file to a table stage without calling Snowpipe. ```yaml output: snowflake_put: account: benthos user: test@benthos.dev private_key_file: path_to_ssh_key.pem role: ACCOUNTADMIN database: BENTHOS_DB warehouse: COMPUTE_WH schema: PUBLIC stage: "@%BENTHOS_TBL" path: benthos upload_parallel_threads: 4 compression: AUTO batching: count: 10 period: 3s processors: - archive: format: concatenate ``` -- DEFLATE compression:: + -- Upload concatenated messages compressed into a `.deflate` archive file to a table stage and call Snowpipe to load them into a table. ```yaml output: snowflake_put: account: benthos user: test@benthos.dev private_key_file: path_to_ssh_key.pem role: ACCOUNTADMIN database: BENTHOS_DB warehouse: COMPUTE_WH schema: PUBLIC stage: "@%BENTHOS_TBL" path: benthos upload_parallel_threads: 4 compression: DEFLATE snowpipe: BENTHOS_PIPE batching: count: 10 period: 3s processors: - archive: format: concatenate - mapping: | root = content().compress("zlib") ``` -- RAW_DEFLATE compression:: + -- Upload concatenated messages compressed into a `.raw_deflate` archive file to a table stage and call Snowpipe to load them into a table. ```yaml output: snowflake_put: account: benthos user: test@benthos.dev private_key_file: path_to_ssh_key.pem role: ACCOUNTADMIN database: BENTHOS_DB warehouse: COMPUTE_WH schema: PUBLIC stage: "@%BENTHOS_TBL" path: benthos upload_parallel_threads: 4 compression: RAW_DEFLATE snowpipe: BENTHOS_PIPE batching: count: 10 period: 3s processors: - archive: format: concatenate - mapping: | root = content().compress("flate") ``` -- ====== == Fields === `account` Account name, which is the same as the https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#where-are-account-identifiers-used[Account Identifier^]. However, when using an https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier[Account Locator^], the Account Identifier is formatted as `..` and this field needs to be populated using the `` part. *Type*: `string` === `region` Optional region field which needs to be populated when using an https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier[Account Locator^] and it must be set to the `` part of the Account Identifier (`..`). *Type*: `string` ```yml # Examples region: us-west-2 ``` === `cloud` Optional cloud platform field which needs to be populated when using an https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier[Account Locator^] and it must be set to the `` part of the Account Identifier (`..`). *Type*: `string` ```yml # Examples cloud: aws cloud: gcp cloud: azure ``` === `user` Username. *Type*: `string` === `password` An optional password. [CAUTION] ==== This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. ==== *Type*: `string` === `private_key` The private SSH key. `private_key_pass` is required when using encrypted keys. [CAUTION] ==== This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. ==== *Type*: `string` === `private_key_file` The path to a file containing the private SSH key. `private_key_pass` is required when using encrypted keys. *Type*: `string` === `private_key_pass` An optional private SSH key passphrase. [CAUTION] ==== This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. ==== *Type*: `string` === `role` Role. *Type*: `string` === `database` Database. *Type*: `string` === `warehouse` Warehouse. *Type*: `string` === `schema` Schema. *Type*: `string` === `stage` Stage name. Use either one of the https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html[supported^] stage types. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` === `path` Stage path. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` *Default*: `""` === `file_name` Stage file name. Will be equal to the Request ID if not set or empty. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` *Default*: `""` Requires version v4.12.0 or newer === `file_extension` Stage file extension. Will be derived from the configured `compression` if not set or empty. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` *Default*: `""` Requires version v4.12.0 or newer ```yml # Examples file_extension: csv file_extension: parquet ``` === `upload_parallel_threads` Specifies the number of threads to use for uploading files. *Type*: `int` *Default*: `4` === `compression` Compression type. *Type*: `string` *Default*: `"AUTO"` |=== | Option | Summary | `AUTO` | Compression (gzip) is applied automatically by the output and messages must contain plain-text JSON. Default `file_extension`: `gz`. | `DEFLATE` | Messages must be pre-compressed using the zlib algorithm (with zlib header, RFC1950). Default `file_extension`: `deflate`. | `GZIP` | Messages must be pre-compressed using the gzip algorithm. Default `file_extension`: `gz`. | `NONE` | No compression is applied and messages must contain plain-text JSON. Default `file_extension`: `json`. | `RAW_DEFLATE` | Messages must be pre-compressed using the flate algorithm (without header, RFC1951). Default `file_extension`: `raw_deflate`. | `ZSTD` | Messages must be pre-compressed using the Zstandard algorithm. Default `file_extension`: `zst`. |=== === `request_id` Request ID. Will be assigned a random UUID (v4) string if not set or empty. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` *Default*: `""` Requires version v4.12.0 or newer === `snowpipe` An optional Snowpipe name. Use the `` part from `..`. `private_key` or `private_key_file` must be set when using this feature. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` === `client_session_keep_alive` Enable Snowflake keepalive mechanism to prevent the client session from expiring after 4 hours (error 390114). *Type*: `bool` *Default*: `false` === `batching` Allows you to configure a xref:configuration:batching.adoc[batching policy]. *Type*: `object` ```yml # Examples batching: byte_size: 5000 count: 0 period: 1s batching: count: 10 period: 1s batching: check: this.contains("END BATCH") count: 0 period: 1m ``` === `batching.count` A number of messages at which the batch should be flushed. If `0` disables count based batching. *Type*: `int` *Default*: `0` === `batching.byte_size` An amount of bytes at which the batch should be flushed. If `0` disables size based batching. *Type*: `int` *Default*: `0` === `batching.period` A period in which an incomplete batch should be flushed regardless of its size. *Type*: `string` *Default*: `""` ```yml # Examples period: 1s period: 1m period: 500ms ``` === `batching.check` A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch. *Type*: `string` *Default*: `""` ```yml # Examples check: this.type == "end_of_transaction" ``` === `batching.processors` A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. *Type*: `array` ```yml # Examples processors: - archive: format: concatenate processors: - archive: format: lines processors: - archive: format: json_array ``` === `max_in_flight` The maximum number of parallel message batches to have in flight at any given time. *Type*: `int` *Default*: `1`