= snowflake_streaming :type: output :status: experimental :categories: ["Services"] //// THIS FILE IS AUTOGENERATED! To make changes, edit the corresponding source file under: https://github.com/redpanda-data/connect/tree/main/internal/impl/. And: https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl //// // © 2024 Redpanda Data Inc. component_type_dropdown::[] Ingest data into Snowflake using Snowpipe Streaming. Introduced in version 4.39.0. [tabs] ====== Common:: + -- ```yml # Common config fields, showing default values output: label: "" snowflake_streaming: account: ORG-ACCOUNT # No default (required) user: "" # No default (required) role: ACCOUNTADMIN # No default (required) database: "" # No default (required) schema: "" # No default (required) table: "" # No default (required) private_key: "" # No default (optional) private_key_file: "" # No default (optional) private_key_pass: "" # No default (optional) mapping: "" # No default (optional) init_statement: | # No default (optional) CREATE TABLE IF NOT EXISTS mytable (amount NUMBER); schema_evolution: enabled: false # No default (required) new_column_type_mapping: |- root = match this.value.type() { this == "string" => "STRING" this == "bytes" => "BINARY" this == "number" => "DOUBLE" this == "bool" => "BOOLEAN" this == "timestamp" => "TIMESTAMP" _ => "VARIANT" } batching: count: 0 byte_size: 0 period: "" check: "" max_in_flight: 64 ``` -- Advanced:: + -- ```yml # All config fields, showing default values output: label: "" snowflake_streaming: account: ORG-ACCOUNT # No default (required) user: "" # No default (required) role: ACCOUNTADMIN # No default (required) database: "" # No default (required) schema: "" # No default (required) table: "" # No default (required) private_key: "" # No default (optional) private_key_file: "" # No default (optional) private_key_pass: "" # No default (optional) mapping: "" # No default (optional) init_statement: | # No default (optional) CREATE TABLE IF NOT EXISTS mytable (amount NUMBER); schema_evolution: enabled: false # No default (required) new_column_type_mapping: |- root = match this.value.type() { this == "string" => "STRING" this == "bytes" => "BINARY" this == "number" => "DOUBLE" this == "bool" => "BOOLEAN" this == "timestamp" => "TIMESTAMP" _ => "VARIANT" } build_parallelism: 1 batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) max_in_flight: 64 channel_prefix: "" # No default (optional) ``` -- ====== Ingest data into Snowflake using Snowpipe Streaming. [%header,format=dsv] |=== Snowflake column type:Allowed format in Benthos CHAR, VARCHAR:string BINARY:[]byte NUMBER:any numeric type, string FLOAT:any numeric type BOOLEAN:bool,any numeric type,string parsable according to `strconv.ParseBool` TIME,DATE,TIMESTAMP:unix or RFC 3339 with nanoseconds timestamps VARIANT,ARRAY,OBJECT:any data type is converted into JSON GEOGRAPHY,GEOMETRY: Not supported |=== For TIMESTAMP, TIME and DATE columns, you can parse different string formats using a bloblang `mapping`. Authentication can be configured using a https://docs.snowflake.com/en/user-guide/key-pair-auth[RSA Key Pair^]. There are https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#limitations[limitations^] of what data types can be loaded into Snowflake using this method. == Performance This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field `max_in_flight`. This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more xref:configuration:batching.adoc[in this doc]. It is recommended that each batches results in at least 16MiB of compressed output being written to Snowflake. You can monitor the output batch size using the `snowflake_compressed_output_size_bytes` metric. == Examples [tabs] ====== Ingesting data from Redpanda:: + -- How to ingest data from Redpanda with consumer groups, decode the schema using the schema registry, then write the corresponding data into Snowflake. ```yaml input: kafka_franz: seed_brokers: ["redpanda.example.com:9092"] topics: ["my_topic_going_to_snow"] consumer_group: "redpanda_connect_to_snowflake" tls: {enabled: true} sasl: - mechanism: SCRAM-SHA-256 username: MY_USER_NAME password: "${TODO}" pipeline: processors: - schema_registry_decode: url: "redpanda.example.com:8081" basic_auth: enabled: true username: MY_USER_NAME password: "${TODO}" output: snowflake_streaming: # By default there is only a single channel per output table allowed # if we want to have multiple Benthos streams writing data # then we need a unique channel prefix per stream. We'll use the host # name to get unique prefixes in this example. channel_prefix: "snowflake-channel-for-${HOST}" account: "MYSNOW-ACCOUNT" user: MYUSER role: ACCOUNTADMIN database: "MYDATABASE" schema: "PUBLIC" table: "MYTABLE" private_key_file: "my/private/key.p8" schema_evolution: enabled: true ``` -- HTTP Sidecar to push data to Snowflake:: + -- This example demonstrates how to create an HTTP server input that can recieve HTTP PUT requests with JSON payloads, that are buffered locally then written to Snowflake in batches. NOTE: This example uses a buffer to respond to the HTTP request immediately, so it's possible that failures to deliver data could result in data loss. See the documentation about xref:components:buffers/memory.adoc[buffers] for more information, or remove the buffer entirely to respond to the HTTP request only once the data is written to Snowflake. ```yaml input: http_server: path: /snowflake buffer: memory: # Max inflight data before applying backpressure limit: 524288000 # 50MiB # Batching policy, influences how large the generated files sent to Snowflake are batch_policy: enabled: true byte_size: 33554432 # 32MiB period: "10s" output: snowflake_streaming: account: "MYSNOW-ACCOUNT" user: MYUSER role: ACCOUNTADMIN database: "MYDATABASE" schema: "PUBLIC" table: "MYTABLE" private_key_file: "my/private/key.p8" ``` -- ====== == Fields === `account` The Snowflake https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier[Account name^]. Which should be formatted as `-` where `` is the name of your Snowflake organization and `` is the unique name of your account within your organization. *Type*: `string` ```yml # Examples account: ORG-ACCOUNT ``` === `user` The user to run the Snowpipe Stream as. See https://docs.snowflake.com/en/user-guide/admin-user-management[Snowflake Documentation^] on how to create a user. *Type*: `string` === `role` The role for the `user` field. The role must have the https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#required-access-privileges[required privileges^] to call the Snowpipe Streaming APIs. See https://docs.snowflake.com/en/user-guide/admin-user-management#user-roles[Snowflake Documentation^] for more information about roles. *Type*: `string` ```yml # Examples role: ACCOUNTADMIN ``` === `database` The Snowflake database to ingest data into. *Type*: `string` === `schema` The Snowflake schema to ingest data into. *Type*: `string` === `table` The Snowflake table to ingest data into. *Type*: `string` === `private_key` The PEM encoded private RSA key to use for authenticating with Snowflake. Either this or `private_key_file` must be specified. [CAUTION] ==== This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. ==== *Type*: `string` === `private_key_file` The file to load the private RSA key from. This should be a `.p8` PEM encoded file. Either this or `private_key` must be specified. *Type*: `string` === `private_key_pass` The RSA key passphrase if the RSA key is encrypted. [CAUTION] ==== This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. ==== *Type*: `string` === `mapping` A bloblang mapping to execute on each message. *Type*: `string` === `init_statement` Optional SQL statements to execute immediately upon the first connection. This is a useful way to initialize tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts. *Type*: `string` ```yml # Examples init_statement: |2 CREATE TABLE IF NOT EXISTS mytable (amount NUMBER); init_statement: |2 ALTER TABLE t1 ALTER COLUMN c1 DROP NOT NULL; ALTER TABLE t1 ADD COLUMN a2 NUMBER; ``` === `schema_evolution` Options to control schema evolution within the pipeline as new columns are added to the pipeline. *Type*: `object` === `schema_evolution.enabled` Whether schema evolution is enabled. *Type*: `bool` === `schema_evolution.new_column_type_mapping` The mapping function from Benthos type to column type in Snowflake. Overriding this can allow for customization of the datatype if there is specific information that you know about the data types in use. This mapping should result in the `root` variable being assigned a string with the data type for the new column in Snowflake. The input to this mapping is an object with the value and the name of the new column, for example: `{"value": 42.3, "name":"new_data_field"}" *Type*: `string` *Default*: `"root = match this.value.type() {\n this == \"string\" =\u003e \"STRING\"\n this == \"bytes\" =\u003e \"BINARY\"\n this == \"number\" =\u003e \"DOUBLE\"\n this == \"bool\" =\u003e \"BOOLEAN\"\n this == \"timestamp\" =\u003e \"TIMESTAMP\"\n _ =\u003e \"VARIANT\"\n}"` === `build_parallelism` The maximum amount of parallelism to use when building the output for Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`. *Type*: `int` *Default*: `1` === `batching` Allows you to configure a xref:configuration:batching.adoc[batching policy]. *Type*: `object` ```yml # Examples batching: byte_size: 5000 count: 0 period: 1s batching: count: 10 period: 1s batching: check: this.contains("END BATCH") count: 0 period: 1m ``` === `batching.count` A number of messages at which the batch should be flushed. If `0` disables count based batching. *Type*: `int` *Default*: `0` === `batching.byte_size` An amount of bytes at which the batch should be flushed. If `0` disables size based batching. *Type*: `int` *Default*: `0` === `batching.period` A period in which an incomplete batch should be flushed regardless of its size. *Type*: `string` *Default*: `""` ```yml # Examples period: 1s period: 1m period: 500ms ``` === `batching.check` A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch. *Type*: `string` *Default*: `""` ```yml # Examples check: this.type == "end_of_transaction" ``` === `batching.processors` A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. *Type*: `array` ```yml # Examples processors: - archive: format: concatenate processors: - archive: format: lines processors: - archive: format: json_array ``` === `max_in_flight` The maximum number of messages to have in flight at a given time. Increase this to improve throughput. *Type*: `int` *Default*: `64` === `channel_prefix` The prefix to use when creating a channel name. Duplicate channel names will result in errors and prevent multiple instances of Benthos from writing at the same time. By default this will create a channel name that is based on the table FQN so there will only be a single stream per table. At most `max_in_flight` channels will be opened. NOTE: There is a limit of 10,000 streams per table - if using more than 10k streams please reach out to Snowflake support. *Type*: `string`