zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
341 stars 138 forks source link

Consumer doesn't consume after `onLost` #1288

Closed ya-at closed 21 hours ago

ya-at commented 3 months ago

When the broker is down, the consumer loses connection to the broker, and tries to reconnect, then onLost happens and after that runloop will never call poll(), so there are no new events. Is it a desired behavior? If yes, then how to restart consumer when onLost happens? (Last event was consumed at 20:08). Also in the application there are two consumer groups and they read the same topic (in parallel); one fails (onLost happens), one continues to work (onLost doesn't happen, since it's connected to a broker that doesn't go down).

Related #1250. Version: 2.8.0.

Logs (the first message — the newest) ``` 2024-07-19 20:10:37.441 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop Offering partition assignment Set() location: Runloop.scala:523 2024-07-19 20:10:37.438 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop.makeRebalanceListener onLost done location: Runloop.scala:240 2024-07-19 20:10:37.433 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.PartitionStreamControl Partition sample-topic-3 lost location: PartitionStreamControl.scala:98 partition: 3 topic: sample-topic 2024-07-19 20:10:37.428 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop.makeRebalanceListener 1 partitions are lost location: Runloop.scala:234 2024-07-19 20:10:37.375 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop Starting poll with 1 pending requests and 0 pending commits, resuming Set(sample-topic-3) partitions location: Runloop.scala:466 2024-07-19 20:10:37.375 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop Processing 0 commits, 1 commands: Poll location: Runloop.scala:733 2024-07-19 20:10:37.325 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop Starting poll with 1 pending requests and 0 pending commits, resuming Set(sample-topic-3) partitions location: Runloop.scala:466 2024-07-19 20:10:37.325 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop Processing 0 commits, 1 commands: Poll location: Runloop.scala:733 2024-07-19 20:10:37.275 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop Starting poll with 1 pending requests and 0 pending commits, resuming Set(sample-topic-3) partitions location: Runloop.scala:466 2024-07-19 20:10:37.275 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop Processing 0 commits, 1 commands: Poll location: Runloop.scala:733 2024-07-19 20:10:37.225 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop Starting poll with 1 pending requests and 0 pending commits, resuming Set(sample-topic-3) partitions location: Runloop.scala:466 2024-07-19 20:10:37.225 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop Processing 0 commits, 1 commands: Poll location: Runloop.scala:733 2024-07-19 20:10:37.174 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop Starting poll with 1 pending requests and 0 pending commits, resuming Set(sample-topic-3) partitions location: Runloop.scala:466 2024-07-19 20:10:37.174 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop Processing 0 commits, 1 commands: Poll location: Runloop.scala:733 2024-07-19 20:10:37.124 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop Starting poll with 1 pending requests and 0 pending commits, resuming Set(sample-topic-3) partitions location: Runloop.scala:466 2024-07-19 20:10:37.124 zio-kafka-runloop-thread-3 zio.kafka.consumer.internal.Runloop Processing 0 commits, 1 commands: Poll location: Runloop.scala:733 ```
erikvanoosten commented 3 months ago

Hello @ya-at. Thanks for your detailed bug report. ~I think we can release 2.8.1 and then (due to #1252) a lost partition is no longer considered fatal.~

erikvanoosten commented 3 months ago

For now, your options are:

erikvanoosten commented 3 months ago

Correction: #1252 is already part of zio-kafka 2.8.0, so something else is going on.

erikvanoosten commented 3 months ago

The newest log line (first line) indicates that no partitions (Set()) are assigned to this consumer. That should not cause polling to stop! (See shouldPoll, subscriptionState.isSubscribed and assignedStreams.isEmpty should be true in this case.)

Can you check the java consumer configurations?

ya-at commented 3 months ago

Can you check the java consumer configurations?

The settings are almost default. Things we changed are client.id, group.id, metrics.reporter (and these options are passed through ConsumerSettings). I don't think it's because of metrics.reporter, since every consumer has this property.

josdirksen commented 1 day ago

We're seeing the same thing again as well. In the 2.7.5 version where the onLost triggered a failure, everything was working. After this change: https://github.com/zio/zio-kafka/pull/1252, we experienced the same issue again as we had before the https://github.com/zio/zio-kafka/pull/1251 fix.

What we often see (especially in the case of a low number of instances) is that all instance lose the partitions at the same time. Everything then stops processing, and no rebalances are triggered. We're moving back to the 2.7.5 version, since that has well defined behaviour.

svroonland commented 1 day ago
def shouldPoll = subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty)

What if assignedStreams is not empty after all partitions are lost? There would be no more pending requests and commits, but still assigned streams so shouldPoll becomes false.

Are we sure we are clearing the lost partitions from assignedStreams? Looking at RebalanceEvent.onLost, we're not filling endedStreams, which would mean assignedStreams is not cleared of the lost partitions.

WDYT @erikvanoosten

erikvanoosten commented 23 hours ago

Are we sure we are clearing the lost partitions from assignedStreams? Looking at RebalanceEvent.onLost, we're not filling endedStreams, which would mean assignedStreams is not cleared of the lost partitions.

Yes, that sounds extremely plausible! Good find!

svroonland commented 20 hours ago

The issue of no more polling after all partitions were lost is (very likely, assuming our reproduction is fully representative of the issue) fixed in v2.8.3.