input: broker: inputs: - kafka: addresses: [ TODO ] topics: [ comments ] consumer_group: benthos_comments_group - kafka: addresses: [ TODO ] topics: [ comments_retry ] consumer_group: benthos_comments_group processors: - for_each: # Calculate time until next retry attempt and sleep for that duration. # This sleep blocks the topic 'comments_retry' but NOT 'comments', # because both topics are consumed independently and these processors # only apply to the 'comments_retry' input. - sleep: duration: '${! 3600 - ( timestamp_unix() - meta("last_attempted").number() ) }s' pipeline: processors: - try: # Perform both hydration and caching within a for_each block as this ensures # that a given message of a batch is cached before the next message is # hydrated, ensuring that when a message of the batch has a parent within # the same batch hydration can still work. - for_each: # Attempt to obtain parent event from cache (if the ID exists). - branch: request_map: root = this.comment.parent_id | deleted() processors: - cache: operator: get resource: hydration_cache key: '${! content() }' # And if successful copy it into the field `article`. result_map: 'root.article = this.article' # Reduce comment into only fields we wish to cache. - branch: request_map: | root.comment.id = this.comment.id root.article = this.article processors: # Store reduced comment into our cache. - cache: operator: set resource: hydration_cache key: '${!json("comment.id")}' value: '${!content()}' # No `result_map` since we don't need to map into the original message. # If we've reached this point then both processors succeeded. - bloblang: 'meta output_topic = "comments_hydrated"' - catch: # If we reach here then a processing stage failed. - bloblang: | meta output_topic = "comments_retry" meta last_attempted = timestamp_unix() # Send resulting documents either to our hydrated topic or the retry topic. output: kafka: addresses: [ TODO ] topic: '${!meta("output_topic")}' cache_resources: - label: hydration_cache memory: init_values: 123foo: | { "article": { "id": "123foo", "title": "Dope article", "content": "this is a totally dope article" } } tests: - name: Basic hydration target_processors: /pipeline/processors input_batch: - content: | { "type": "comment", "comment": { "id": "456bar", "parent_id": "123foo", "content": "this article sucks" }, "user": { "id": "user2" } } - content: | { "type": "comment", "comment": { "id": "789baz", "parent_id": "456bar", "content": "this article is great, actually" }, "user": { "id": "user3" } } output_batches: - - json_equals: { "type": "comment", "article": { "id": "123foo", "title": "Dope article", "content": "this is a totally dope article" }, "comment": { "id": "456bar", "parent_id": "123foo", "content": "this article sucks" }, "user": { "id": "user2" } } - json_equals: { "type": "comment", "article": { "id": "123foo", "title": "Dope article", "content": "this is a totally dope article" }, "comment": { "id": "789baz", "parent_id": "456bar", "content": "this article is great, actually" }, "user": { "id": "user3" } }