aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.12k stars 227 forks source link

how to handle UnknownTopicOrPartitionError? #793

Open dongho-jung opened 2 years ago

dongho-jung commented 2 years ago

What I want to do is subscribing topics that have name with specific pattern like email.service.* dynamically. that dynamically means that while it is consuming I should be able to create or delete topics and it should handle it.

Creating a topic and subscribing it dynamically was fine.. However if I delete a topic that already being subscribed, consumer spew out a lot of error messages like follows:

OffsetCommit failed for group email.service on partition TopicPartition(topic='email.service.order', partition=0) with offset OffsetAndMetadata(offset=1, metadata=''): UnknownTopicOrPartitionError
OffsetCommit failed for group email.service on partition TopicPartition(topic='email.service.order', partition=0) with offset OffsetAndMetadata(offset=1, metadata=''): UnknownTopicOrPartitionError
OffsetCommit failed for group email.service on partition TopicPartition(topic='email.service.order', partition=0) with offset OffsetAndMetadata(offset=1, metadata=''): UnknownTopicOrPartitionError
...

But I couldn't handle this, because it didn't raise any exceptions. So I checked the code.

https://github.com/aio-libs/aiokafka/blob/724e8105ce0163b2b2df176af65605aafd3ed060/aiokafka/consumer/group_coordinator.py#L1146-L1149

for UnknownTopicOrPartitionError, it just warn it not raise it. How can I handle this exception? if I handled this exception, I would go like this:

...
        try:
            result = await consumer.getmany(timeout_ms=500)
        except UnknownTopicOrPartitionError as e:
            consumer.unsubscribe()
            consumer.subscribe(pattern='email.service.*')
...

maybe this is a xy problem, if are there any better way to subscribe topics dynamically, please let me know! thanks!

ods commented 2 years ago

Would such handling of exception actually fix the problem? You won't receive an exception when new topic is created.

dongho-jung commented 2 years ago

yeah but if the topic I'm subscribing is deleted, I get a lot of warning messages

ods commented 2 years ago

Do you need just to silence those warnings? It can be achieved by adding filter to corresponding logger.

eliax1996 commented 1 year ago

This is a problem also for me, when a topic is deleted we should be able to remove it from the list of topics that we ask metadata that we query from kafka. For long running instance this problem can consume a lot of cpu and log resources.

I'm thinking of resetting the list of topics by performing an unsubscribe and a subscribe again towards the same pattern (in my case it's even harder because I can have in the library different types of subscriptions, I can have a list of topic to observe, a pattern to subscribe or even an explicit association of partitions+topic).

Currently I'm handling it by adding a custom listener during the registration:

class TopicDeletionEventGenerator(ConsumerRebalanceListener):
    def __init__(self):
        self._consumer = None

    def set_consumer(self, consumer: AIOKafkaConsumer):
        self._consumer = consumer

    # as documentations says we are guaranteed to be called when a topic is deleted
    async def on_partitions_revoked(self, revoked: List[TopicPartition]):
        assert self._consumer is not None

        available_topics = await self._consumer.topics()
        revoked_topics = [topic for topic, _ in revoked]
        for revoked_topic in revoked_topics:
            if revoked_topic not in available_topics:
                self._topic_deleted()
                break

    def _topic_deleted(self):
        self._consumer.unsubscribe()
        # followed by something like:
        # self._consumer.subscribe(topics, self)

    def on_partitions_assigned(self, assigned: List[TopicPartition]):
        pas

IMO this should be managed by the library itself and should throw an Error during the next poll, like: TopicDeletedException.