dapr / components-contrib

Community driven, reusable components for distributed apps
Apache License 2.0
548 stars 480 forks source link

Kafka pubsub message gets dropped despite default retry forever policy #3529

Closed passuied closed 2 months ago

passuied commented 2 months ago

Expected Behavior

Kafka pubsub invalid message gets retried forever when default retry policy in place

Actual Behavior

Kafka pubsub invalid message gets dropped upon the pod terminating

Steps to Reproduce the Problem

  1. Use default retry policy (retry forever)
  2. publish valid message to topic my_topic
  3. publish an invalid message and another valid message to the same topic
  4. set up a consumer consuming my_topic which returns 500 when consuming invalid message
  5. Observe that dapr will retry processing the message over and over
  6. Terminate pod
  7. In log the following message will show: "Too many failed attempts at processing Kafka message:..."
  8. Upon restart, the consumer will move to next message. I.e. the message was dropped...

Release Note

RELEASE NOTE: FIX Bug in Kafka Pubsub where poison pill message will get dropped upon pod termination despite retry forever policy

passuied commented 2 months ago

Based on research, this issue is the result of this code:

The solution is not super clear here as there is a lot of ping pong between the dapr and component-contrib code. An expert eye would be welcome. Specifically, it is not clear what acks the message when a permanent error occurs in general... The only place that seems to ack the message is in the doCallBack() if the handler doesn't return an error...

yaron2 commented 2 months ago

Based on research, this issue is the result of this code:

  • retry.NotifyRecover() will retry per the retry policy unless a Permanent error is encountered or the context is Done (see doRetryNotify())
  • When bubbled up, the ConsumeClaim() code will just assume the error is a permanent error and return null, causing the message to be ack'ed.

The solution is not super clear here as there is a lot of ping pong between the dapr and component-contrib code. An expert eye would be welcome. Specifically, it is not clear what acks the message when a permanent error occurs in general... The only place that seems to ack the message is in the doCallBack() if the handler doesn't return an error...

cc @halspang can you take a look?

passuied commented 2 months ago

cc @halspang can you take a look?

I can provide a log of the event that got dropped... Dapr retried for 18h on the same message (as expected) then upon pod termination, the message "Too many failed attempts at processing Kafka message:..." showed up and the message got skipped...

Please note this happened in version 1.13.4 as our Production cluster hasn't been updated yet to 1.14.x (we were going to upgrade last week but now waiting for 1.14.2)

yaron2 commented 2 months ago

I can reproduce this as well locally, but I suspect something is off with my local Docker based Kafka as this reproduces all the way back to Dapr 1.10.x. (didn't test further)

passuied commented 2 months ago

@yaron2 I think I figured it out. the problem occurs because there is a valid message that exists afterwards and the retry.NotifyRecover() exits but the next loop will process the next message instead of exiting... The solution should be simple and just move the case session.Context().Done() above so it doesn't process the next message and exits directly!

Testing it right now...

passuied commented 2 months ago

I can reproduce this as well locally, but I suspect something is off with my local Docker based Kafka as this reproduces all the way back to Dapr 1.10.x. (didn't test further)

Your local issue is probably because (like me earlier) the first message you're processing is invalid and therefore the offset was NEVER committed for this group. So next restart, it will default to Newest offset... Which is normal behavior...

It took me a while to figure this one out! :D

yaron2 commented 2 months ago

I can reproduce this as well locally, but I suspect something is off with my local Docker based Kafka as this reproduces all the way back to Dapr 1.10.x. (didn't test further)

Your local issue is probably because (like me earlier) the first message you're processing is invalid and therefore the offset was NEVER committed for this group. So next restart, it will default to Newest offset... Which is normal behavior...

It took me a while to figure this one out! :D

I literally just now noticed this. Setting initialOffset: oldest resolves it.

passuied commented 2 months ago

This is issue is now resolved