warpstreamlabs / bento

Fancy stream processing made operationally mundane. This repository is a fork of the original project before the license was changed.
https://warpstreamlabs.github.io/bento/
Other
963 stars 55 forks source link

Redis streams kind of replay nacks even when `auto_replay_nacks` is `false` #84

Open bssyousefi opened 2 months ago

bssyousefi commented 2 months ago

I have a simple bento application which reads data from a Redis stream and put the data into a SQL db. I want to make sure that if for some reasons a record faced an error while trying to put the data into db, the record stays not acknowledged on Redis streams so I can easily rerun the application after I fixed the issue with the SQL insertion. As far as I understand by default if there is an error in the output no acknowledgement will take place, however the record causing the error will keep replaying the whole pipeline. To avoid this, there is a parameter called auto_replay_nacks that could help me. Based on my understanding, if I give value false to auto_replay_nacks I expect no failed record repeating the pipeline and also the failed record being not acknowledged. However, when I'm testing the scenario, the failed record keeps repeating the whole pipeline regardless of the value of parameter auto_replay_nacks.

As I went through the code, I found out that when the parameter auto_replay_nacks is false this part of the code is adding the failed record to the pendingMsgs, so the failed record repeats forever. https://github.com/warpstreamlabs/bento/blob/17147d6ff09b296f23a002662478f51730a8fc04/internal/impl/redis/input_streams.go#L407

return msg.payload, func(rctx context.Context, res error) error {
    f res != nil {
        r.pendingMsgsMut.Lock()
        r.pendingMsgs = append(r.pendingMsgs, msg)
        r.pendingMsgsMut.Unlock()
    } else {
        r.addAsyncAcks(msg.stream, msg.id)
    }
    return nil
}, nil

I'm not sure if my expectation from auto_replay_nacks is valid or not. I would appreciate it if you kindly let me know how I can avoid the repeating in this scenario.

For replicating the case you could use a simple input-output configuration, for which you have a record that fails in the output section.

gregfurman commented 2 months ago

Hey. Thanks for opening this issue!

I've done a bit of a deep dive here and think see the confusion, and agree that the auto-replaying functionality in the context of a component like a redis_stream is maybe not that clear. Disclaimer though: I'm no Redis expert!

So auto_replay_nacks is useful for when an ack is called with an error, but the ack handler doesn't have any error handling capabiltiies defined.

https://github.com/warpstreamlabs/bento/blob/17147d6ff09b296f23a002662478f51730a8fc04/public/service/input_auto_retry_batched.go#L27-L33

Fundamentally, however, a redis stream requires that messages be acknowledged (via an XACK command) before it can be removed from the Redis stream. The code you linked above shows Bento adding non-errored messages to an async queue to be ACK'd asynchronously r.addAsyncAcks(msg.stream, msg.id).

On the Redis stream side, an XACK will all evict a message from a consumer group's Pending Entries List.

From the Redis streams docs:

Consuming a message, however, requires an explicit acknowledgment using a specific command. Redis interprets the acknowledgment as: this message was correctly processed so it can be evicted from the consumer group.

Now back to your original problem. You're correct in saying that when downstream messages from a redis_stream error out, that pendingMsgs are appended to with the failed message(s). Also, notice how the ack function only does this when ack is passed an error via the res parameter.

If you do not want these messages to be re-queued, you'll want to ack them in some way (or potentially even catch the error) -- signalling that Bento asynchronously discard them from the redis stream.

There are a couple of ways to do this so I'd recommend giving a look at the Error Handling docs. Specifically, the Dropping Failed Messages section could be relevant since:

This will remove any failed messages from a batch. Furthermore, dropping a message will propagate an acknowledgement (also known as "ack") upstream to the pipeline's input.

Let me know if this makes sense. Otherwise, I'm happy to also chat on the Discord if it's easier 😄

bssyousefi commented 2 months ago

Thanks @gregfurman for the explanation. so I guess by nacking in bento, we mean not acknowledging the proper receipt to the input source. Am I right? As a matter of fact, I want bento to process the record just once even if there was some error. However, what I'm looking for is a mechanism in which bento doesn't ack in case of error and also avoid replaying the record, so I can have a list of records in redis streams NOT acknowledged and pending. I tried to call reject in output in order to nack the record in case of error and to avoid repeating the record with error I tried to use mapping: root = deleted() before reject to prevent bento from adding the record to pendingMsgs. However, it didn't work as I wished. I guess the flow stops after dropping the record by mapping command and reject is not being called. Is there any solution to avoid calling ack in case of error without repeating the record?