twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.7+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.78k stars 182 forks source link

Question - Purging and adding topics results in redelivery of old messages despite last offset #727

Closed StarpTech closed 4 months ago

StarpTech commented 4 months ago

Hi, we use AddConsumeTopics(foo) to add new topics to the client dynamically. This works great. Unfortunately, we experienced redelivering of old messages once PurgeTopicsFromConsuming(foo) has been called after AddConsumeTopics(foo). It seems that the client forget that we have set the offset to kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())). Is this expected? I believe it is because PurgeTopicsFromConsuming(foo) will also remove the offset state.

We were able to workaround this by using PauseFetchTopics and ResumeFetchTopics

twmb commented 4 months ago

I'm a bit confused here -- when you say "It seems that the client forget that we have set the offset to ... time.Now()", doesn't that mean that the time.Now is set once, and then any time you purge and re-add the topic, re-adding it will re-consume from the original offset milli.

What is your expected goal? Purging does remove literally everything about the state of a topic within the client. What is not removed is any initial configuration -- initial configuration is used when re-adding the topic.

If you intend to pause consuming and then resume later, PauseFetchTopics and ResumeFetchTopics are the two correct APIs to use.

I'm going to pre-emptively close this issue hoping my message answers your question, but please reply if it did not.

StarpTech commented 4 months ago

I'm a bit confused here -- when you say "It seems that the client forget that we have set the offset to ... time.Now()", doesn't that mean that the time.Now is set once, and then any time you purge and re-add the topic, re-adding it will re-consume from the original offset milli.

Exactly, that's the conclusion I came to as well. It was not obvious to me because I created the client once with the offset config. Thanks for confirming.