= branch :type: processor :status: stable :categories: ["Composition"] //// THIS FILE IS AUTOGENERATED! To make changes, edit the corresponding source file under: https://github.com/redpanda-data/connect/tree/main/internal/impl/. And: https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl //// // © 2024 Redpanda Data Inc. component_type_dropdown::[] The `branch` processor allows you to create a new request message via a xref:guides:bloblang/about.adoc[Bloblang mapping], execute a list of processors on the request messages, and, finally, map the result back into the source message using another mapping. ```yml # Config fields, showing default values label: "" branch: request_map: "" processors: [] # No default (required) result_map: "" ``` This is useful for preserving the original message contents when using processors that would otherwise replace the entire contents. == Metadata Metadata fields that are added to messages during branch processing will not be automatically copied into the resulting message. In order to do this you should explicitly declare in your `result_map` either a wholesale copy with `meta = metadata()`, or selective copies with `meta foo = metadata("bar")` and so on. It is also possible to reference the metadata of the origin message in the `result_map` using the xref:guides:bloblang/about.adoc#metadata[`@` operator]. == Error handling If the `request_map` fails the child processors will not be executed. If the child processors themselves result in an (uncaught) error then the `result_map` will not be executed. If the `result_map` fails the message will remain unchanged. Under any of these conditions standard xref:configuration:error_handling.adoc[error handling methods] can be used in order to filter, DLQ or recover the failed messages. == Conditional branching If the root of your request map is set to `deleted()` then the branch processors are skipped for the given message, this allows you to conditionally branch messages. == Fields === `request_map` A xref:guides:bloblang/about.adoc[Bloblang mapping] that describes how to create a request payload suitable for the child processors of this branch. If left empty then the branch will begin with an exact copy of the origin message (including metadata). *Type*: `string` *Default*: `""` ```yml # Examples request_map: |- root = { "id": this.doc.id, "content": this.doc.body.text } request_map: |- root = if this.type == "foo" { this.foo.request } else { deleted() } ``` === `processors` A list of processors to apply to mapped requests. When processing message batches the resulting batch must match the size and ordering of the input batch, therefore filtering, grouping should not be performed within these processors. *Type*: `array` === `result_map` A xref:guides:bloblang/about.adoc[Bloblang mapping] that describes how the resulting messages from branched processing should be mapped back into the original payload. If left empty the origin message will remain unchanged (including metadata). *Type*: `string` *Default*: `""` ```yml # Examples result_map: |- meta foo_code = metadata("code") root.foo_result = this result_map: |- meta = metadata() root.bar.body = this.body root.bar.id = this.user.id result_map: root.raw_result = content().string() result_map: |- root.enrichments.foo = if metadata("request_failed") != null { throw(metadata("request_failed")) } else { this } result_map: |- # Retain only the updated metadata fields which were present in the origin message meta = metadata().filter(v -> @.get(v.key) != null) ``` == Examples [tabs] ====== HTTP Request:: + -- This example strips the request message into an empty body, grabs an HTTP payload, and places the result back into the original message at the path `image.pull_count`: ```yaml pipeline: processors: - branch: request_map: 'root = ""' processors: - http: url: https://hub.docker.com/v2/repositories/jeffail/benthos verb: GET headers: Content-Type: application/json result_map: root.image.pull_count = this.pull_count # Example input: {"id":"foo","some":"pre-existing data"} # Example output: {"id":"foo","some":"pre-existing data","image":{"pull_count":1234}} ``` -- Non Structured Results:: + -- When the result of your branch processors is unstructured and you wish to simply set a resulting field to the raw output use the content function to obtain the raw bytes of the resulting message and then coerce it into your value type of choice: ```yaml pipeline: processors: - branch: request_map: 'root = this.document.id' processors: - cache: resource: descriptions_cache key: ${! content() } operator: get result_map: root.document.description = content().string() # Example input: {"document":{"id":"foo","content":"hello world"}} # Example output: {"document":{"id":"foo","content":"hello world","description":"this is a cool doc"}} ``` -- Lambda Function:: + -- This example maps a new payload for triggering a lambda function with an ID and username from the original message, and the result of the lambda is discarded, meaning the original message is unchanged. ```yaml pipeline: processors: - branch: request_map: '{"id":this.doc.id,"username":this.user.name}' processors: - aws_lambda: function: trigger_user_update # Example input: {"doc":{"id":"foo","body":"hello world"},"user":{"name":"fooey"}} # Output matches the input, which is unchanged ``` -- Conditional Caching:: + -- This example caches a document by a message ID only when the type of the document is a foo: ```yaml pipeline: processors: - branch: request_map: | meta id = this.id root = if this.type == "foo" { this.document } else { deleted() } processors: - cache: resource: TODO operator: set key: ${! @id } value: ${! content() } ``` -- ======