= redpanda_common :type: output :status: beta :categories: ["Services"] //// THIS FILE IS AUTOGENERATED! To make changes, edit the corresponding source file under: https://github.com/redpanda-data/connect/tree/main/internal/impl/. And: https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl //// // © 2024 Redpanda Data Inc. component_type_dropdown::[] Sends data to a Redpanda (Kafka) broker, using credentials defined in a common top-level `redpanda` config block. [tabs] ====== Common:: + -- ```yml # Common config fields, showing default values output: label: "" redpanda_common: topic: "" # No default (required) key: "" # No default (optional) partition: ${! meta("partition") } # No default (optional) metadata: include_prefixes: [] include_patterns: [] max_in_flight: 10 batching: count: 0 byte_size: 0 period: "" check: "" ``` -- Advanced:: + -- ```yml # All config fields, showing default values output: label: "" redpanda_common: topic: "" # No default (required) key: "" # No default (optional) partition: ${! meta("partition") } # No default (optional) metadata: include_prefixes: [] include_patterns: [] timestamp: ${! timestamp_unix() } # No default (optional) max_in_flight: 10 batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) ``` -- ====== == Examples [tabs] ====== Simple Output:: + -- Data is generated and written to a topic bar, targetting the cluster configured within the redpanda block at the bottom. This is useful as it allows us to configure TLS and SASL only once for potentially multiple inputs and outputs. ```yaml input: generate: interval: 1s mapping: 'root.name = fake("name")' pipeline: processors: - mutation: | root.id = uuid_v4() root.loud_name = this.name.uppercase() output: redpanda_common: topic: bar key: ${! @id } redpanda: seed_brokers: [ "127.0.0.1:9092" ] tls: enabled: true sasl: - mechanism: SCRAM-SHA-512 password: bar username: foo ``` -- ====== == Fields === `topic` A topic to write messages to. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` === `key` An optional key to populate for each message. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` === `partition` An optional explicit partition to set for each message. This field is only relevant when the `partitioner` is set to `manual`. The provided interpolation string must be a valid integer. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` ```yml # Examples partition: ${! meta("partition") } ``` === `metadata` Determine which (if any) metadata values should be added to messages as headers. *Type*: `object` === `metadata.include_prefixes` Provide a list of explicit metadata key prefixes to match against. *Type*: `array` *Default*: `[]` ```yml # Examples include_prefixes: - foo_ - bar_ include_prefixes: - kafka_ include_prefixes: - content- ``` === `metadata.include_patterns` Provide a list of explicit metadata key regular expression (re2) patterns to match against. *Type*: `array` *Default*: `[]` ```yml # Examples include_patterns: - .* include_patterns: - _timestamp_unix$ ``` === `timestamp` An optional timestamp to set for each message. When left empty, the current timestamp is used. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` ```yml # Examples timestamp: ${! timestamp_unix() } timestamp: ${! metadata("kafka_timestamp_unix") } ``` === `max_in_flight` The maximum number of messages to have in flight at a given time. Increase this to improve throughput. *Type*: `int` *Default*: `10` === `batching` Allows you to configure a xref:configuration:batching.adoc[batching policy]. *Type*: `object` ```yml # Examples batching: byte_size: 5000 count: 0 period: 1s batching: count: 10 period: 1s batching: check: this.contains("END BATCH") count: 0 period: 1m ``` === `batching.count` A number of messages at which the batch should be flushed. If `0` disables count based batching. *Type*: `int` *Default*: `0` === `batching.byte_size` An amount of bytes at which the batch should be flushed. If `0` disables size based batching. *Type*: `int` *Default*: `0` === `batching.period` A period in which an incomplete batch should be flushed regardless of its size. *Type*: `string` *Default*: `""` ```yml # Examples period: 1s period: 1m period: 500ms ``` === `batching.check` A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch. *Type*: `string` *Default*: `""` ```yml # Examples check: this.type == "end_of_transaction" ``` === `batching.processors` A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. *Type*: `array` ```yml # Examples processors: - archive: format: concatenate processors: - archive: format: lines processors: - archive: format: json_array ```