Closed ennru closed 5 years ago
Roughly , our code that is seeing this issue looks as such:
val dataSource = RestartSource.withBackoff(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2
) { () =>
val kafkaConsumer = // Our class that sets up and returns a Kafka Consumer Source
kafkaConsumer mapMaterializedValue { c => consumerControl.set(c) }
}
val dataFuture = dataSource
.toMat(Committer.sink(CommitterSettings(system)))(Keep.right)
.run()
We do attach some flow steps into the source but it is still returned as a source. It would appear intermittently, in circumstances where the flow restarted, that we get the above listed error when attempting to commit. @ennru's diagnosis pretty accurately explains what we're seeing.
I dug into this and found a good solution (and fixed another bug) in #953
The commit batching expects exactly one consumer actor per group ID as offsets are sent back to that consumer actor for committing:
https://github.com/akka/alpakka-kafka/blob/d93b4b81bcdf68251b7e205181717fd6d8914441/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala#L207-L217
This does not work when the consumer is wrapped in a
RestartSource
and the commits are batched outside of it. Whenever theRestartSource
fails and creates a new consumer, those offsets will refer to a new consumer actor.If ordering can be assumed, it might be correct to flush the offsets that arrived for a different actor for the same group ID.