IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.57k stars 1.76k forks source link

Re-ordering of offset commit requests can cause committed offset to move "backwards". #2940

Closed prestona closed 3 months ago

prestona commented 4 months ago
Description

Offsets committed by multiple consumer group sessions can be re-ordered by the time they arrive at Kafka, potentially resulting in duplicate message delivery, should the offset be used to resume consumption.

If a single sarama.ConsumerGroup is consuming from more than one partition, then reordering of offset commit requests can occur. This is because of a race condition between the goroutines used to run the ConsumerClaim method for different topic partitions.

When ConsumerGroupSession.Commit() is called, this ends up calling into offsetManager.flushToBroker(). This in turn:

  1. Builds a offset commit request, using any so-far uncommitted offsets across all of the partitions belonging to the group
  2. Finds the group's coordinator (if not already known)
  3. Sends the offset commit request to the broker that is the group coordinator.

However, there is no locking to prevent the interleaving of two (or more) go routines between the building an offset commit request, and sending the request to Kafka.

For example, consider the following interactions between two goroutines P0 and P1 - each belonging to the same consumer group, but consuming from different partitions of a topic:

  1. P0 marks offset 10 and calls commit (the uncommitted offsets tracked by the session are: {P0 -> 11})
  2. P0 builds the offset commit request (containing {P0 -> 11}).
  3. P0 goroutine yields, and P1 goroutine is scheduled
  4. P1 marks offset 20 and calls commit (the uncommitted offsets tracked by the session are: {P0 -> 11, P1 -> 21})
  5. P1 goroutine yields, and P0 goroutine is scheduled
  6. P0 sends the offset commit ({P0 -> 11}) request, gets the response, and resumes running code in the ConsumerClaim method
  7. P0 marks offset 11 and calls commit (the uncommitted offsets tracked by the session are {0->12, 1->21})
  8. P0 builds the offset commit request (containing {0->12, 1->21}).
  9. P0 sends its offset commit request (containing {0->12, 1->21})
  10. P0 goroutine yields, and P1 goroutine is scheduled
  11. P1 sends its offset commit request (containing {P0 -> 11, P1 -> 21}).

From the Kafka broker's perspective, it receives:

  1. Offset commit request (step 6): {P0 -> 11}
  2. Offset commit request (step 9) {P0 -> 12, P2 -> 21}
  3. Offset commit request (step 10): {P0 -> 11, P2 -> 21}

This causes the offset committed for P0 to move "backwards" from 12 to 11.

Due to the nature of this race condition, it is more likely to occur if:

Versions
Sarama Kafka Go
main (d2246cc9b9df113f3ffed7c442fd81e8b9cf736e) 3.6 and 3.8 1.21.10
Configuration
    config := sarama.NewConfig()
    config.Version = sarama.V3_3_0_0
    config.Consumer.Return.Errors = true
    config.Consumer.Offsets.Initial = sarama.OffsetOldest
    config.Consumer.Offsets.AutoCommit.Enable = false
Logs

Not applicable. Sarama does not emit any logging relating to this problem.

Additional Context
prestona commented 3 months ago

I've opened a couple of PR's for this one.

I think I favor #2941, but either should fix this issue.