akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka-kafka/current/home.html
Other
1.42k stars 387 forks source link

KafkaConsumerActor hungs after timeout exception while fetching consumer position #947

Open pbala-github opened 4 years ago

pbala-github commented 4 years ago

After getting a partition assignment, KafkaConsumerActor tries to retrieve the position of the consumer: def assignedPositions(assignedTps: Set[TopicPartition], consumer: Consumer[_, _], positionTimeout: java.time.Duration): Unit = { val assignedOffsets = assignedTps.map(tp => tp -> consumer.position(tp, positionTimeout)).toMap assignedPositions(assignedTps, assignedOffsets) }

However, the call fails with timeout: 2019-10-23 10:38:06.905 ERROR [LP7RP1PC2] [:] [bet-consumer-dispatcher-5] o.a.k.c.c.i.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=bet-consumer-aa] User provided listener akka.kafka.internal.KafkaConsumerActor$RebalanceListenerImpl failed on partition assignment org.apache.kafka.common.errors.TimeoutException: Timeout of 1ms expired before the position for partition bets-4 could be determined

Nothing happens from that point on. No messages are consumed nor a rebalance happens. We first noticed this issue in version 1.0.1 but it is also present in 1.1.0 when setting a value to commit-refresh-interval property.

Note: I've set position-timeout to 1ms in order to reproduce the issue.

ennru commented 4 years ago

Thanks for reporting. I think the consumer.position call in CommitRefreshing.Impl needs to be guarded with try-catch.

Are you aware that Kafka 2.1 fixed the issue that required the commit refreshing?

Please feel free to suggest the necessary changes in a Pull Request.

pbala-github commented 4 years ago

@ennru Thanks for your immediate feedback.

Yes I know it's been fixed in Kafka 2.1 but unfortunately we cannot upgrade in short term.