akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka-kafka/current/home.html
Other
1.42k stars 387 forks source link

Off-by-one issue in offset committer #53

Closed suiryc closed 8 years ago

suiryc commented 8 years ago

Hi,

I believe there is an issue in the offset committer. As per the kafka API, the offset is the position the consumer shall read from to process its next message. So when flushing offsets (which goes through NativeCommitter.commitPhaseTrial) the offset of the last processed message is incremented by 1 (OffsetMap.toCommitRequestInfo) before being sent, and the result (NativeCommitter.getOffsetsTrial) is kept. However this result is used as base of comparison for the next flush trigger (ConsumerCommitter.commitGatheredOffsets): the issue here is that the code compares values with different meaning since a registered commit request contains the last processed offset, while the flush response contains the first unprocessed offset.

For example if you enable debug logging, you get the following result:

...
[c.s.r.kafka.commit.ConsumerCommitter] Received commit request for partition 0 and offset 34
[c.s.r.kafka.commit.ConsumerCommitter] Registering commit for partition 0 and offset 34, last registered = 33
[c.s.r.kafka.commit.ConsumerCommitter] Flushing offsets to commit
[c.s.r.kafka.commit.ConsumerCommitter] committed offsets: OffsetMap(Map([topic-name,0] -> 35))
[c.s.r.kafka.commit.ConsumerCommitter] Received commit request for partition 0 and offset 35
[c.s.r.kafka.commit.ConsumerCommitter] Registering commit for partition 0 and offset 35, last registered = 34
...

As you can see, offset 34 is registered, but the committer will send 34+1=35 (as can be seen in the committed offsets).

This triggers 2 issues if no more than one commit request (for the same topic) is received after a flush and before the next one:

  1. The last commit request is not committed, because ConsumerCommitter.commitGatheredOffsets compares the received value to the previous flush result (which are the same in this case) and does nothing if there is no difference (the diff result stored in offsetMapToFlush is empty).
  2. Even if new commit requests are received later, they are no more processed because since the code believed there was nothing to commit (see 1.), the performFlush function was not called and the scheduledFlush variable was not set to None: this prevents a registered commit request to schedule a new flush (since it believes one is already pending). This issue was introduced with e2d3e4775df81e9ea085bd96f77f557eaeef505f.
suiryc commented 8 years ago

Tried a quick fix by resetting scheduled flush variable as first step when flushing and adapting fetch request response to keep the offset of last fetch message (as expected in the committer code). Fixed both issues I encountered (last commit not performed, and committer not flushing anymore in described scenario).

Was not sure where to put the offset - 1 when processing fetch request response. Since the opposite was done before going through the NativeCommitter, I put it right when keeping the response in ConsumerCommitter.

Feel free to take this fix or make a smarter one if necessary.

kciesielski commented 8 years ago

Hi @suiryc, that indeed looks like a serious issue, thanks a lot for such a detailed analysis. I'm still trying to make sure we have an issue here by writing a failing test and checking whether your fix will make it pass. By the way - how did you stumble upon this bug?

suiryc commented 8 years ago

At first, with version 0.8.1, I was simply looking for a way to view the backlog for a consumer using KafkaOffsetMonitor, manually triggering a producer to write some (one by one) messages to a topic. And from time to time I observed that the 'lag' (remaining number of messages) was 1 while all messages had been consumed. Inserting a new message later would bump it to 2 then back to 0 as expected. I though the monitor may be at fault, and it didn't bother me that much at the time.

After the manual tests, I was working with a consumer that was processing messages at a 'slow' pace (less than 10 per seconds), and was messing with the committer rate (set it to 100ms for example). Then I moved to 0.8.2, and saw through the monitor logs that after some time the offsets stopped changing while the consumer was working. Since restarting the consumer really did fetch again the same messages already processed, it was almost certain that the offsets were indeed not committed anymore. So I changed the log level to debug for the committer and noticed it really did stop logging commits at some point. What remained was to look at the code to understand how it could happen.

I had a producer inject one message at a time to manually check my fix was working.

kciesielski commented 8 years ago

@suiryc Impressive investigation :) I will prepare a bugfix release (0.8.3) as soon as I finish writing a test for such case.

kciesielski commented 8 years ago

@suiryc Could you submit a PR with your fix?

suiryc commented 8 years ago

See pull request #55