Closed bunengxiu closed 1 year ago
Bug using aiokafka consumer getmany method, error : 'NoneType' object has no attribute 'check_errors'
Raise coordination errors if any self._coordinator.check_errors() # self._coordinator is None
producer code as blow:
from aiokafka import AIOKafkaProducer import asyncio async def producer(): p = AIOKafkaProducer(bootstrap_servers='localhost:9092') await p.start() try: await p.send("my_topic", b"my_topic") finally: await p.stop() if __name__ == '__main__': asyncio.run(producer())
consumer code as blow:
import asyncio from aiokafka import AIOKafkaConsumer async def consumer(): consume = AIOKafkaConsumer( 'my_topic', bootstrap_servers='localhost:9092', group_id="my-group", auto_offset_reset='earliest', ) try: data = await consume.getmany() for tp, msgs in data.items(): print(tp.topic, tp.partition) for msg in msgs: print(msg.offset, msg.key, msg.value) except Exception as e: print(e) finally: print('finally') await consume.stop() if __name__ == '__main__': asyncio.run(consumer())
Environment:
Bug using aiokafka consumer getmany method, error : 'NoneType' object has no attribute 'check_errors'
Raise coordination errors if any self._coordinator.check_errors() # self._coordinator is None
producer code as blow:
consumer code as blow:
Environment: