= cached :type: processor :status: experimental :categories: ["Utility"] //// THIS FILE IS AUTOGENERATED! To make changes, edit the corresponding source file under: https://github.com/redpanda-data/connect/tree/main/internal/impl/. And: https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl //// // © 2024 Redpanda Data Inc. component_type_dropdown::[] Cache the result of applying one or more processors to messages identified by a key. If the key already exists within the cache the contents of the message will be replaced with the cached result instead of applying the processors. This component is therefore useful in situations where an expensive set of processors need only be executed periodically. Introduced in version 4.3.0. ```yml # Config fields, showing default values label: "" cached: cache: "" # No default (required) skip_on: errored() # No default (optional) key: my_foo_result # No default (required) ttl: "" # No default (optional) processors: [] # No default (required) ``` The format of the data when stored within the cache is a custom and versioned schema chosen to balance performance and storage space. It is therefore not possible to point this processor to a cache that is pre-populated with data that this processor has not created itself. == Examples [tabs] ====== Cached Enrichment:: + -- In the following example we want to we enrich messages consumed from Kafka with data specific to the origin topic partition, we do this by placing an `http` processor within a `branch`, where the HTTP URL contains interpolation functions with the topic and partition in the path. However, it would be inefficient to make this HTTP request for every single message as the result is consistent for all data of a given topic partition. We can solve this by placing our enrichment call within a `cached` processor where the key contains the topic and partition, resulting in messages that originate from the same topic/partition combination using the cached result of the prior. ```yaml pipeline: processors: - branch: processors: - cached: key: '${! meta("kafka_topic") }-${! meta("kafka_partition") }' cache: foo_cache processors: - mapping: 'root = ""' - http: url: http://example.com/enrichment/${! meta("kafka_topic") }/${! meta("kafka_partition") } verb: GET result_map: 'root.enrichment = this' cache_resources: - label: foo_cache memory: # Disable compaction so that cached items never expire compaction_interval: "" ``` -- Periodic Global Enrichment:: + -- In the following example we enrich all messages with the same data obtained from a static URL with an `http` processor within a `branch`. However, we expect the data from this URL to change roughly every 10 minutes, so we configure a `cached` processor with a static key (since this request is consistent for all messages) and a TTL of `10m`. ```yaml pipeline: processors: - branch: request_map: 'root = ""' processors: - cached: key: static_foo cache: foo_cache ttl: 10m processors: - http: url: http://example.com/get/foo.json verb: GET result_map: 'root.foo = this' cache_resources: - label: foo_cache memory: {} ``` -- ====== == Fields === `cache` The cache resource to read and write processor results from. *Type*: `string` === `skip_on` A condition that can be used to skip caching the results from the processors. *Type*: `string` ```yml # Examples skip_on: errored() ``` === `key` A key to be resolved for each message, if the key already exists in the cache then the cached result is used, otherwise the processors are applied and the result is cached under this key. The key could be static and therefore apply generally to all messages or it could be an interpolated expression that is potentially unique for each message. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` ```yml # Examples key: my_foo_result key: ${! this.document.id } key: ${! meta("kafka_key") } key: ${! meta("kafka_topic") } ``` === `ttl` An optional expiry period to set for each cache entry. Some caches only have a general TTL and will therefore ignore this setting. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. *Type*: `string` === `processors` The list of processors whose result will be cached. *Type*: `array`