redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.09k stars 817 forks source link

The `fallback` output should stop trying to send messages to unreachable outputs #1210

Open mihaitodor opened 2 years ago

mihaitodor commented 2 years ago

The following config will keep trying to send messages to the kafka_franz output if it's unavailable, although there's an stdout fallback output configured:

input:
  generate:
    mapping: |
      root.id = count("foo_counter1")
    interval: 0s
    count: 3

pipeline:
  threads: 1
  processors:
    - bloblang: |
        root = match {
          this.id == 1 => {"msg": "foo"}
          this.id == 2 => {"msg": "bar"}
          _ => throw("UNKNOWN ID")
        }

        root.id = this.id

output:
  switch:
    retry_until_success: true
    cases:
      - check: errored()
        output:
          stdout:
            codec: lines
          processors:
            - log:
                message: "errored message: ${! json()}"

      - output:
          fallback:
            - kafka_franz:
                seed_brokers:
                  - localhost:6666
                topic: test
            - stdout:
                codec: lines
          processors:
            - log:
                message: "successful message: ${! json()}"

logger:
  level: DEBUG
  format: json
  add_timestamp: true
  static_fields:
    "@service": benthos

shutdown_timeout: 3s # Default 20s

From the Discord chat, it looks like the solution would be to try and detect connection loss via an enhancement.

LE: Might be nice to also have an exponential_backoff field which tells fallthrough to wait a bit more time before trying again to send a message to an output which is experiencing connectivity issues.

twmb commented 2 years ago

Please let me know if there's something I can help with here!

theRealWardo commented 1 year ago

I dug into this a lot more and I suspect this is a pretty non-trivial change as it fundamentally alters the way Benthos thinks about connectivity errors and its reconnection loop. Given the scope, I put the discussion over at https://github.com/benthosdev/benthos/discussions/754#discussioncomment-4261704

I also have a sketch of what it takes to fix this for possibly a similar situation I faced with redis_list if it helps anyone trying to fix a fallback that isn't falling back.

Jeffail commented 1 year ago

Thanks @theRealWardo, totally agree with your main points there. I think the fundamental problem has been that different client libraries like to obfuscate connectivity and cancellation in their own ways so it's very difficult to have a consistent mechanism in there. My priority up until now has been just to ensure we have guaranteed delivery, which some client libraries are worringly eager to hide.

However, I think now would be a good time to start aiming for greater consistency in cancellation and deadlines during delivery. A massive unblocker for this has been putting contexts everywhere as before v4 that wasn't the case, but now we have a way of delivering both cancellation and deadlines out to components including processors (which still needs glueing together).

I've marked this issue as our very first v5 label. The purpose of that label isn't to defer this until the next major version bump (we hopefully will never need one), but it's a indicator that this is a fundamental improvement to Benthos internals that's significant enough that we would need to have an answer for it before considering any new major version bump or new plugin API.

theRealWardo commented 1 year ago

That was an impressively quick reply @Jeffail - I really appreciate that! Thank you!

Do you have a proposed starting point for this work? Let me know how you like to do these kinds of changes.

I lean towards a draft PR with some possible configuration options on the Config struct along with an accompanying docs change describing the fields in detail of course and possibly a configuration/guide as well?

Happy to help if you give me a shove in the right direction. 😄