= broker :type: input :status: stable :categories: ["Utility"] //// THIS FILE IS AUTOGENERATED! To make changes, edit the corresponding source file under: https://github.com/redpanda-data/connect/tree/main/internal/impl/. And: https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl //// // © 2024 Redpanda Data Inc. component_type_dropdown::[] Allows you to combine multiple inputs into a single stream of data, where each input will be read in parallel. [tabs] ====== Common:: + -- ```yml # Common config fields, showing default values input: label: "" broker: inputs: [] # No default (required) batching: count: 0 byte_size: 0 period: "" check: "" ``` -- Advanced:: + -- ```yml # All config fields, showing default values input: label: "" broker: copies: 1 inputs: [] # No default (required) batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) ``` -- ====== A broker type is configured with its own list of input configurations and a field to specify how many copies of the list of inputs should be created. Adding more input types allows you to combine streams from multiple sources into one. For example, reading from both RabbitMQ and Kafka: ```yaml input: broker: copies: 1 inputs: - amqp_0_9: urls: - amqp://guest:guest@localhost:5672/ consumer_tag: benthos-consumer queue: benthos-queue # Optional list of input specific processing steps processors: - mapping: | root.message = this root.meta.link_count = this.links.length() root.user.age = this.user.age.number() - kafka: addresses: - localhost:9092 client_id: benthos_kafka_input consumer_group: benthos_consumer_group topics: [ benthos_stream:0 ] ``` If the number of copies is greater than zero the list will be copied that number of times. For example, if your inputs were of type foo and bar, with 'copies' set to '2', you would end up with two 'foo' inputs and two 'bar' inputs. == Batching It's possible to configure a xref:configuration:batching.adoc#batch-policy[batch policy] with a broker using the `batching` fields. When doing this the feeds from all child inputs are combined. Some inputs do not support broker based batching and specify this in their documentation. == Processors It is possible to configure xref:components:processors/about.adoc[processors] at the broker level, where they will be applied to _all_ child inputs, as well as on the individual child inputs. If you have processors at both the broker level _and_ on child inputs then the broker processors will be applied _after_ the child nodes processors. == Fields === `copies` Whatever is specified within `inputs` will be created this many times. *Type*: `int` *Default*: `1` === `inputs` A list of inputs to create. *Type*: `array` === `batching` Allows you to configure a xref:configuration:batching.adoc[batching policy]. *Type*: `object` ```yml # Examples batching: byte_size: 5000 count: 0 period: 1s batching: count: 10 period: 1s batching: check: this.contains("END BATCH") count: 0 period: 1m ``` === `batching.count` A number of messages at which the batch should be flushed. If `0` disables count based batching. *Type*: `int` *Default*: `0` === `batching.byte_size` An amount of bytes at which the batch should be flushed. If `0` disables size based batching. *Type*: `int` *Default*: `0` === `batching.period` A period in which an incomplete batch should be flushed regardless of its size. *Type*: `string` *Default*: `""` ```yml # Examples period: 1s period: 1m period: 500ms ``` === `batching.check` A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch. *Type*: `string` *Default*: `""` ```yml # Examples check: this.type == "end_of_transaction" ``` === `batching.processors` A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. *Type*: `array` ```yml # Examples processors: - archive: format: concatenate processors: - archive: format: lines processors: - archive: format: json_array ```