wbarnha / kafka-python-ng

Fork for Python client for Apache Kafka
https://wbarnha.github.io/kafka-python-ng/
Apache License 2.0
78 stars 11 forks source link

KafkaConsumer does not revoke deleted partitions when using a ConsumerRebalanceListener #120

Open wbarnha opened 8 months ago

wbarnha commented 8 months ago

In my system, a KafkaConsumer uses a regex subscription to subscribe to topics in the form of <user_id>.<topic>. A ConsumberRebalanceListener is used to write topic offsets and the aggregated topic states into a local cache. Automated tests create and delete users (i.e. topics).

When new topics are created, everything works fine. The logs show that the subscription changed, currently subscribed partitions are revoked and the partitions of the created topics are added to the set of previously assigned TopicsPartitions.

When a topic is deleted (via a DeleteTopicsRequest), the log shows that the subscription changes ("Updatding subscribed topics to: …"). The subscription no longer contains the deleted topics, which is correct. Afterwards the currently assigned partitions are revoked in ConsumerCoordinator._on_join_prepare ("Revoking previously assigned partitions …"). However, the set of revoked partitions no longer contains the partitions of the deleted topic, because the subscription was already updated.

I would expect that deleted TopicPartitions are also passed to ConsumerRebalanceListener.on_partitions_revoked. Is my expectation wrong or is this a bug?