= dedupe :type: processor :status: stable :categories: ["Utility"] //// THIS FILE IS AUTOGENERATED! To make changes, edit the corresponding source file under: https://github.com/redpanda-data/connect/tree/main/internal/impl/. And: https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl //// // © 2024 Redpanda Data Inc. component_type_dropdown::[] Deduplicates messages by storing a key value in a cache using the `add` operator. If the key already exists within the cache it is dropped. ```yml # Config fields, showing default values label: "" dedupe: cache: "" # No default (required) key: ${! meta("kafka_key") } # No default (required) drop_on_err: true ``` Caches must be configured as resources, for more information check out the xref:components:caches/about.adoc[cache documentation]. When using this processor with an output target that might fail you should always wrap the output within an indefinite xref:components:outputs/retry.adoc[`retry`] block. This ensures that during outages your messages aren't reprocessed after failures, which would result in messages being dropped. == Batch deduplication This processor enacts on individual messages only, in order to perform a deduplication on behalf of a batch (or window) of messages instead use the xref:components:processors/cache.adoc#examples[`cache` processor]. == Delivery guarantees Performing deduplication on a stream using a distributed cache voids any at-least-once guarantees that it previously had. This is because the cache will preserve message signatures even if the message fails to leave the Benthos pipeline, which would cause message loss in the event of an outage at the output sink followed by a restart of the Benthos instance (or a server crash, etc). This problem can be mitigated by using an in-memory cache and distributing messages to horizontally scaled Benthos pipelines partitioned by the deduplication key. However, in situations where at-least-once delivery guarantees are important it is worth avoiding deduplication in favour of implement idempotent behavior at the edge of your stream pipelines. == Fields === `cache` The xref:components:caches/about.adoc[`cache` resource] to target with this processor. *Type*: `string` === `key` An interpolated string yielding the key to deduplicate by for each message. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` ```yml # Examples key: ${! meta("kafka_key") } key: ${! content().hash("xxhash64") } ``` === `drop_on_err` Whether messages should be dropped when the cache returns a general error such as a network issue. *Type*: `bool` *Default*: `true` == Examples [tabs] ====== Deduplicate based on Kafka key:: + -- The following configuration demonstrates a pipeline that deduplicates messages based on the Kafka key. ```yaml pipeline: processors: - dedupe: cache: keycache key: ${! meta("kafka_key") } cache_resources: - label: keycache memory: default_ttl: 60s ``` -- ======