aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.08k stars 224 forks source link

[QUESTION] Restarting `AIOKafkaConsumer` after `AIOKafkaConsumer.stop()` #1010

Open dom-gunstone opened 1 month ago

dom-gunstone commented 1 month ago

Hi,

I have a use case where I'm injecting a AIOKafkaConsumer instance into a class. The class has its own start and stop methods that, amongst other things, call the start and stop methods of the consumer class. The issue is that I'm failing the assert self._fetcher is None check in AIOKafkaConsumer.start() method.

Am I right in thinking this should work?

import asyncio

from aiokafka import AIOKafkaConsumer

async def main() -> None:
    consumer = AIOKafkaConsumer("test_topic", bootstrap_servers="localhost:9092")

    await consumer.start()
    await consumer.stop()
    await consumer.start()

if __name__ == "__main__":
    asyncio.run(main())

I suppose I could pass the class a function that creates the consumer instance, but it feels a bit less neat.

import asyncio

from aiokafka import AIOKafkaConsumer

async def main() -> None:
    def get_new_consumer() -> AIOKafkaConsumer:
        return AIOKafkaConsumer("test_topic", bootstrap_servers="localhost:9092")

    consumer = get_new_consumer()

    await consumer.start()
    await consumer.stop()

    consumer = get_new_consumer()

    await consumer.start()

if __name__ == "__main__":
    asyncio.run(main())
ods commented 1 month ago

No, consumer instance is not reusable, you have to recreate it.

dom-gunstone commented 1 month ago

@ods Thanks for confirming.

Is this something that could be implemented, or is it not possible?

ods commented 1 month ago

It's possible to fix this, but I wouldn't expect somebody to invest their time in it.