twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.6+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.61k stars 158 forks source link

Fetch returns duplicate records after Kafka (RedPanda) disconnect #713

Closed genzgd closed 2 months ago

genzgd commented 2 months ago

We're testing the behavior of PollRecords during a temporary Kafka outage and we're trying to avoid getting duplicate records.

In our integration test, we read 10k records from Redpanda, call PollRecords again, and in a separate go routine, commit. While the subsequent PollRecords is running, we pause the RedPanda container for 45 seconds. We then produce another 10k records to the same topic. The "follow up" PollRecords (which has been running for 45 seconds) successfully picks up those 10k records, but doesn't have time to commit them before we get a heartbeat error, and the client tries to rejoin the group:

2024-04-27 05:51:01.591 INF heartbeat errored err="UNKNOWN_MEMBER_ID: The coordinator is not aware of this member." group=TestKafkaRetries
2024-04-27 05:51:01.591 DBG entering OnPartitionsLost format=json org_id=test-org pipe_id=TestKafkaRetries service_id= type=kafka with={"trips_json":[0]}
2024-04-27 05:51:01.591 INF injecting fake fetch with an error err="unable to join group session: UNKNOWN_MEMBER_ID: The coordinator is not aware of this member." why="notification of group management loop error"
2024-04-27 05:51:01.591 INF assigning partitions format=json how=1 input=null why="clearing assignment at end of group management session"
2024-04-27 05:51:01.591 ERR join and sync loop errored backoff=213.24197 consecutive_errors=1 err="UNKNOWN_MEMBER_ID: The coordinator is not aware of this member." 

When the new group session begins, it picks up the commit from the first 10k records, but then rereads the second 10k and we end up processing them again.

Without tracking offsets ourselves for "exactly once" semantics, is there some way to avoid this scenario? It's challenging that the PollRecords group "survives" the 45 second outage and fetches records and everything seems okay, but then the heartbeat comes back and says "this is broken, I have to start over".

twmb commented 2 months ago

IIRC, a paused container hangs network traffic, but it's still possible to dial.

The 45s problem you're running into is the session timeout. You're freezing the container for exactly as long as the session timeout, so the group member must be kicked, and the commit must fail.

You could freeze the container for a few seconds less, or bump the session timeout up higher past how long you're freezing the container. This would allow the heartbeat to be eventually successful, and you wont have a failed commit.

The PollRecords group is surviving for 45s because for 45s, the member is fine, according to both the broker and the client. On the client side, there is an internal default timeout for heartbeat requests that matches the session timeout. On the broker, the group is alive even if a heartbeat isn't received, until the session timeout is hit. On the client, at 45s, the heartbeat request is cut and the group is failed. On thebroker, at 45s, the client is kicked from the group for not having sent a heartbeat.

If you want to lower the bounds on the failure, you could lower the SessionTimeout. This basically controls how long a group could fail being checked into (commits or heartbeats), at which point the member is kicked on the broker, and the client itself notices it should cancel any request and fail.

Another alternative might be to internally kill the heartbeat request after say, 2x the heartbeat interval (so 6s rather than the default session timeout of 45s), but this is a different tradeoff with other downsides. If there truly are network issues that are causing heartbeat requests to be slow, a 6s timeout could be cutting a request that would have succeeded, and now the client itself is actively sabotaging its own group membership.

genzgd commented 2 months ago

Thanks for the detailed explanation! Yes, "pausing" the docker container still allows the dial, so it's not a particularly "real world" failure situation. I assumed the 45 seconds was hitting some exact timeout, since that's when we started seeing test failures. We'll consider the options -- one more quick question, how do we "kill the heartbeat request."

twmb commented 1 month ago

I somewhat miswrote -- you can't manually kill the heartbeat request. Internally, it is killed if you leave the group or close the client. However, you could add your own RetryTimeoutFn that returns a smaller time for the Heartbeat request specifically, overriding the default 45s that is applied by this function.

The two config knobs to control here are: RequestTimeoutOverhead and RetryTimeoutFn. I don't really recommend touching either of them, though. It's possible I could add some further control option to the client but I'm not sure what to add here / why to add it -- it's not really the most normal for a dial to succeed but then network traffic to completely hang.