numaproj / numaflow

Kubernetes-native platform to run massively parallel data/streaming jobs
https://numaflow.numaproj.io/
Apache License 2.0
1.07k stars 111 forks source link

Expose Retry Count and Retry Timeout For Sink #1951

Closed yhl25 closed 2 weeks ago

yhl25 commented 4 weeks ago

Summary

Currently, we have infinite retry with a fixed interval of 1 ms, which can lead to problems when the sink is in a bad state. We should provide an option for the user to configure max retry timeout interval.

We should also expose Retry Count so the user can either drop the message based on the number of retries or write to the fallback sink.

Note: Retry is an optimistic retry. The max retry can be >= the user-configured retry count because the counter will be reset each time the pod is restarted.

Use Cases

When would you use this?


### Tasks
- [x] Add retryStrategy to Sink CRD
- [x] Fixed retry for Pipeline
- [x] Fixed retry for MonoVertex
- [ ] ~Exponential retry for Pipeline~
- [ ] ~Exponential retry for MonoVertex~
- [ ] https://github.com/numaproj/numaflow/issues/2016
- [ ] ~Expose retry count from sinks~
whynowy commented 3 weeks ago
vertices:
    - name: output
      scale:
        min: 1
      sink:
        retryStrategy:
          # Optional
          backoff:
            duration: 1s # Optional, defaults 1ms
            factor: xx # Optional, default 1
            jitter: xx # Optional, default 0
            steps: 3 # Optional, number of retries (exclude the 1st time), defaults to 0
          # Optional, defaults to retry, in that case, steps will be ignored
          onFailure: retry|fallback|drop 
        udsink:
          container:
            image: quay.io/numaio/numaflow-go/fb-sink-log:stable
            imagePullPolicy: Always
        fallback:
          udsink:
            container:
              image: quay.io/numaio/numaflow-go/fb-sink-log:stable
kohlisid commented 3 weeks ago

Default Backoff is configured to use constant interval retries.

1) Default onFailure -> Infinite 2) If fallback spec configured and onFailure != fallback -> Error 3) If fallback spec configured and onFailure not defined -> Use fallback 4) Drop only when onFailure = drop 5) If onFailure = infinite and Steps !=0 -> Error 6) If onFailure = infinite and Steps =0 (Default) -> Overwrite to Max