= gcp_cloud_storage :type: output :status: beta :categories: ["Services","GCP"] //// THIS FILE IS AUTOGENERATED! To make changes, edit the corresponding source file under: https://github.com/redpanda-data/connect/tree/main/internal/impl/. And: https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl //// // © 2024 Redpanda Data Inc. component_type_dropdown::[] Sends message parts as objects to a Google Cloud Storage bucket. Each object is uploaded with the path specified with the `path` field. Introduced in version 3.43.0. [tabs] ====== Common:: + -- ```yml # Common config fields, showing default values output: label: "" gcp_cloud_storage: bucket: "" # No default (required) path: ${!counter()}-${!timestamp_unix_nano()}.txt content_type: application/octet-stream collision_mode: overwrite timeout: 3s credentials_json: "" max_in_flight: 64 batching: count: 0 byte_size: 0 period: "" check: "" ``` -- Advanced:: + -- ```yml # All config fields, showing default values output: label: "" gcp_cloud_storage: bucket: "" # No default (required) path: ${!counter()}-${!timestamp_unix_nano()}.txt content_type: application/octet-stream content_encoding: "" collision_mode: overwrite chunk_size: 16777216 timeout: 3s credentials_json: "" max_in_flight: 64 batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) ``` -- ====== In order to have a different path for each object you should use function interpolations described in xref:configuration:interpolation.adoc#bloblang-queries[Bloblang queries], which are calculated per message of a batch. == Metadata Metadata fields on messages will be sent as headers, in order to mutate these values (or remove them) check out the xref:configuration:metadata.adoc[metadata docs]. == Credentials By default Benthos will use a shared credentials file when connecting to GCP services. You can find out more in xref:guides:cloud/gcp.adoc[]. == Batching It's common to want to upload messages to Google Cloud Storage as batched archives, the easiest way to do this is to batch your messages at the output level and join the batch of messages with an xref:components:processors/archive.adoc[`archive`] and/or xref:components:processors/compress.adoc[`compress`] processor. For example, if we wished to upload messages as a .tar.gz archive of documents we could achieve that with the following config: ```yaml output: gcp_cloud_storage: bucket: TODO path: ${!counter()}-${!timestamp_unix_nano()}.tar.gz batching: count: 100 period: 10s processors: - archive: format: tar - compress: algorithm: gzip ``` Alternatively, if we wished to upload JSON documents as a single large document containing an array of objects we can do that with: ```yaml output: gcp_cloud_storage: bucket: TODO path: ${!counter()}-${!timestamp_unix_nano()}.json batching: count: 100 processors: - archive: format: json_array ``` == Performance This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field `max_in_flight`. This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more xref:configuration:batching.adoc[in this doc]. == Fields === `bucket` The bucket to upload messages to. *Type*: `string` === `path` The path of each message to upload. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` *Default*: `"${!counter()}-${!timestamp_unix_nano()}.txt"` ```yml # Examples path: ${!counter()}-${!timestamp_unix_nano()}.txt path: ${!meta("kafka_key")}.json path: ${!json("doc.namespace")}/${!json("doc.id")}.json ``` === `content_type` The content type to set for each object. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` *Default*: `"application/octet-stream"` === `content_encoding` An optional content encoding to set for each object. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` *Default*: `""` === `collision_mode` Determines how file path collisions should be dealt with. *Type*: `string` *Default*: `"overwrite"` Requires version 3.53.0 or newer |=== | Option | Summary | `append` | Append the message bytes to the original file. | `error-if-exists` | Return an error, this is the equivalent of a nack. | `ignore` | Do not modify the original file, the new data will be dropped. | `overwrite` | Replace the existing file with the new one. |=== === `chunk_size` An optional chunk size which controls the maximum number of bytes of the object that the Writer will attempt to send to the server in a single request. If ChunkSize is set to zero, chunking will be disabled. *Type*: `int` *Default*: `16777216` === `timeout` The maximum period to wait on an upload before abandoning it and reattempting. *Type*: `string` *Default*: `"3s"` ```yml # Examples timeout: 1s timeout: 500ms ``` === `credentials_json` An optional field to set Google Service Account Credentials json. [CAUTION] ==== This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. ==== This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` *Default*: `""` === `max_in_flight` The maximum number of message batches to have in flight at a given time. Increase this to improve throughput. *Type*: `int` *Default*: `64` === `batching` Allows you to configure a xref:configuration:batching.adoc[batching policy]. *Type*: `object` ```yml # Examples batching: byte_size: 5000 count: 0 period: 1s batching: count: 10 period: 1s batching: check: this.contains("END BATCH") count: 0 period: 1m ``` === `batching.count` A number of messages at which the batch should be flushed. If `0` disables count based batching. *Type*: `int` *Default*: `0` === `batching.byte_size` An amount of bytes at which the batch should be flushed. If `0` disables size based batching. *Type*: `int` *Default*: `0` === `batching.period` A period in which an incomplete batch should be flushed regardless of its size. *Type*: `string` *Default*: `""` ```yml # Examples period: 1s period: 1m period: 500ms ``` === `batching.check` A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch. *Type*: `string` *Default*: `""` ```yml # Examples check: this.type == "end_of_transaction" ``` === `batching.processors` A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. *Type*: `array` ```yml # Examples processors: - archive: format: concatenate processors: - archive: format: lines processors: - archive: format: json_array ```