benthosdev / benthos

Fancy stream processing made operationally mundane
https://www.benthos.dev
MIT License
7.68k stars 752 forks source link

Support a general nak_delay for all msgs and a header to set the nak_delay individually for each msg on NATS Jetstream input #2556

Open mfamador opened 2 weeks ago

mfamador commented 2 weeks ago

Add an optional parameter to make NATS Jetstream to delay the redelivery of all msgs when negatively acknowledged. It can be used as a retry mechanism when a transient error occurs.

input:
  nats_jetstream:
    urls:
      - localhost:4222
    subject: foo
    durable: bar
    nak_delay: 5m

pipeline:
  processors:
  -  . . . 

output:
  switch:
    cases:
    - check: 'errored()'
      output:
        reject: "rejecting due to processing error: ${! error() }"
    - output:
        resource: output_ok

Additionally, we can use a header on each msg to define the unix epoch timestamp until when we want to delay the msg processing. By default is nak_delay_until but we can set it with another name using the parameter nak_delay_until_header

input:
  nats_jetstream:
    subject: foo
    durable: bar

pipeline:
  processors:
  - mapping: throw("transient error")

output:
  switch:
    cases:
    - check: '@nak_delay_until.number() - timestamp_unix() > 0'
      output:
        reject: "not time to process it yet ${! @time_to_process }"

    - check: 'errored()'
      output:
        nats_jetstream:
          subject: foo
          headers:
            nak_delay_until: ${! (timestamp_unix() + @num_retries * 10).int64() }
            num_retries: ${! @num_retries }
        processors:
        - mapping: meta num_retries = @num_retries.number().or(0) + 1

    - output:
        drop: { } # our regular output
        processors: