pipeline: processors: - mapping: | meta batch_tag = if batch_index() == 0 { nanoid(10) } - dedupe: cache: local key: ${! meta("batch_tag").from(0) + content() } cache_resources: - label: local memory: default_ttl: 1m tests: - name: de-duplicate by batches input_batches: - - content: '1' - content: '2' - content: '3' - content: '4' - content: '3' - content: '3' - content: '3' - - content: '4' - content: '1' - content: '1' - content: '3' - content: '4' - content: '4' - content: '2' - content: '1' output_batches: - - content_equals: 1 - content_equals: 2 - content_equals: 3 - content_equals: 4 - - content_equals: 4 - content_equals: 1 - content_equals: 3 - content_equals: 2