= gcp_pubsub :type: output :status: stable :categories: ["Services","GCP"] //// THIS FILE IS AUTOGENERATED! To make changes, edit the corresponding source file under: https://github.com/redpanda-data/connect/tree/main/internal/impl/. And: https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl //// // © 2024 Redpanda Data Inc. component_type_dropdown::[] Sends messages to a GCP Cloud Pub/Sub topic. xref:configuration:metadata.adoc[Metadata] from messages are sent as attributes. [tabs] ====== Common:: + -- ```yml # Common config fields, showing default values output: label: "" gcp_pubsub: project: "" # No default (required) credentials_json: "" topic: "" # No default (required) endpoint: "" max_in_flight: 64 count_threshold: 100 delay_threshold: 10ms byte_threshold: 1000000 metadata: exclude_prefixes: [] batching: count: 0 byte_size: 0 period: "" check: "" ``` -- Advanced:: + -- ```yml # All config fields, showing default values output: label: "" gcp_pubsub: project: "" # No default (required) credentials_json: "" topic: "" # No default (required) endpoint: "" ordering_key: "" # No default (optional) max_in_flight: 64 count_threshold: 100 delay_threshold: 10ms byte_threshold: 1000000 publish_timeout: 1m0s metadata: exclude_prefixes: [] flow_control: max_outstanding_bytes: -1 max_outstanding_messages: 1000 limit_exceeded_behavior: block batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) ``` -- ====== For information on how to set up credentials, see https://cloud.google.com/docs/authentication/production[this guide^]. == Troubleshooting If you're consistently seeing `Failed to send message to gcp_pubsub: context deadline exceeded` error logs without any further information it is possible that you are encountering https://github.com/benthosdev/benthos/issues/1042, which occurs when metadata values contain characters that are not valid utf-8. This can frequently occur when consuming from Kafka as the key metadata field may be populated with an arbitrary binary value, but this issue is not exclusive to Kafka. If you are blocked by this issue then a work around is to delete either the specific problematic keys: ```yaml pipeline: processors: - mapping: | meta kafka_key = deleted() ``` Or delete all keys with: ```yaml pipeline: processors: - mapping: meta = deleted() ``` == Fields === `project` The project ID of the topic to publish to. *Type*: `string` === `credentials_json` An optional field to set Google Service Account Credentials json. [CAUTION] ==== This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. ==== *Type*: `string` *Default*: `""` === `topic` The topic to publish to. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` === `endpoint` An optional endpoint to override the default of `pubsub.googleapis.com:443`. This can be used to connect to a region specific pubsub endpoint. For a list of valid values, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_regional_endpoints[this document^]. *Type*: `string` *Default*: `""` ```yml # Examples endpoint: us-central1-pubsub.googleapis.com:443 endpoint: us-west3-pubsub.googleapis.com:443 ``` === `ordering_key` The ordering key to use for publishing messages. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` === `max_in_flight` The maximum number of messages to have in flight at a given time. Increasing this may improve throughput. *Type*: `int` *Default*: `64` === `count_threshold` Publish a pubsub buffer when it has this many messages *Type*: `int` *Default*: `100` === `delay_threshold` Publish a non-empty pubsub buffer after this delay has passed. *Type*: `string` *Default*: `"10ms"` === `byte_threshold` Publish a batch when its size in bytes reaches this value. *Type*: `int` *Default*: `1000000` === `publish_timeout` The maximum length of time to wait before abandoning a publish attempt for a message. *Type*: `string` *Default*: `"1m0s"` ```yml # Examples publish_timeout: 10s publish_timeout: 5m publish_timeout: 60m ``` === `metadata` Specify criteria for which metadata values are sent as attributes, all are sent by default. *Type*: `object` === `metadata.exclude_prefixes` Provide a list of explicit metadata key prefixes to be excluded when adding metadata to sent messages. *Type*: `array` *Default*: `[]` === `flow_control` For a given topic, configures the PubSub client's internal buffer for messages to be published. *Type*: `object` === `flow_control.max_outstanding_bytes` Maximum size of buffered messages to be published. If less than or equal to zero, this is disabled. *Type*: `int` *Default*: `-1` === `flow_control.max_outstanding_messages` Maximum number of buffered messages to be published. If less than or equal to zero, this is disabled. *Type*: `int` *Default*: `1000` === `flow_control.limit_exceeded_behavior` Configures the behavior when trying to publish additional messages while the flow controller is full. The available options are block (default), ignore (disable), and signal_error (publish results will return an error). *Type*: `string` *Default*: `"block"` Options: `ignore` , `block` , `signal_error` . === `batching` Configures a batching policy on this output. While the PubSub client maintains its own internal buffering mechanism, preparing larger batches of messages can further trade-off some latency for throughput. *Type*: `object` ```yml # Examples batching: byte_size: 5000 count: 0 period: 1s batching: count: 10 period: 1s batching: check: this.contains("END BATCH") count: 0 period: 1m ``` === `batching.count` A number of messages at which the batch should be flushed. If `0` disables count based batching. *Type*: `int` *Default*: `0` === `batching.byte_size` An amount of bytes at which the batch should be flushed. If `0` disables size based batching. *Type*: `int` *Default*: `0` === `batching.period` A period in which an incomplete batch should be flushed regardless of its size. *Type*: `string` *Default*: `""` ```yml # Examples period: 1s period: 1m period: 500ms ``` === `batching.check` A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch. *Type*: `string` *Default*: `""` ```yml # Examples check: this.type == "end_of_transaction" ``` === `batching.processors` A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. *Type*: `array` ```yml # Examples processors: - archive: format: concatenate processors: - archive: format: lines processors: - archive: format: json_array ```