aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.15k stars 233 forks source link

`assert assignment is not None and assignment.active` raising incorrectly #796

Open cdgriffith opened 2 years ago

cdgriffith commented 2 years ago

Describe the bug I have a program that runs two listener consumers that are told when to start command, and alert when one is finished.

The start command consumer will launch new consumers per new topic command as needed. However this seems to raise assertion errors.

If I remove said assertion line the program runs as expected. Am I missing a core concept of how this is supported to work or is this an error?

Thanks for any help or pointers!

Errors Logs ``` INFO: Started server process [11818] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://127.0.0.1:3333 (Press CTRL+C to quit) 2021-11-17 15:15:21.502 | DEBUG | __main__:launch_watcher:83 - Received launch topic: ConsumerRecord(topic='my_topic_prefix.launch', partition=0, offset=2, timestamp=1637183719944, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_ size=17, headers=()) 2021-11-17 15:15:22.090 | DEBUG | __main__:consume_message:132 - Reading test message at offset 0 : {TopicPartition(topic='my_topic_prefix.test.command', partition=0): [ConsumerRecord(topic='my_topic_prefix.test.command', partition=0, offset=0, timestamp=1637183621283, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=()), ConsumerRecord(topic='my_topic_prefix.test.command', partition=0, offset=1, timestamp=1637183642792, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialize d_key_size=-1, serialized_value_size=17, headers=()), ConsumerRecord(topic='my_topic_prefix.test.command', partition=0, offset=2, timestamp=1637183719944, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=())]} 2021-11-17 15:15:22.091 | INFO | __main__:consume_message:136 - {'topic': 'test'} Unexpected error in coordinator routine Traceback (most recent call last): File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 555, in _coordination_routine await self.__coordination_routine() File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 608, in __coordination_routine assert assignment is not None and assignment.active AssertionError 2021-11-17 15:16:04.554 | DEBUG | __main__:launch_watcher:83 - Received launch topic: ConsumerRecord(topic='my_topic_prefix.launch', partition=0, offset=3, timestamp=1637183763013, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=()) Task exception was never retrieved future: exception=KafkaError('Unexpected error during coordination AssertionError()')> Traceback (most recent call last): File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 555, in _coordination_routine await self.__coordination_routine() File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 608, in __coordination_routine assert assignment is not None and assignment.active AssertionError During handling of the above exception, another exception occurred: Traceback (most recent call last): File "test_fast.py", line 131, in consume_message msg = await consumer.getmany(timeout_ms=1000, max_records=5) File "venv/lib/python3.10/site-packages/aiokafka/consumer/consumer.py", line 1181, in getmany self._coordinator.check_errors() File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 300, in check_errors self._coordination_task.result() File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 564, in _coordination_routine raise kafka_exckafka.errors.KafkaError: KafkaError: Unexpected error during coordination AssertionError() ```

Expected behavior

Removing the assert makes the code run as expected:

Expected Logs ``` 2021-11-17 15:46:05.227 | DEBUG | __main__:launch_watcher:83 - Received launch topic: ConsumerRecord(topic='my_topic_prefix.launch', partition=0, offset=0, timestamp=1637185563622, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=()) 2021-11-17 15:46:05.748 | DEBUG | __main__:consume_message:137 - Reading test message at offset 0 : {TopicPartition(topic='my_topic_prefix.test.command', partition=0): [ConsumerRecord(topic=my_topic_prefix.test.command', partition=0, offset=0, timestamp=1637185563622, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=())]} 2021-11-17 15:46:05.748 | INFO | __main__:consume_message:139 - High priority issue incoming 2021-11-17 15:46:05.749 | INFO | __main__:consume_message:142 - {'topic': 'test'} 2021-11-17 15:46:10.145 | DEBUG | __main__:finish_watcher:96 - Received completed topic: ConsumerRecord(topic='my_topic_prefix.finish', partition=0, offset=0, timestamp=1637185568621, timestamp_type=0, key=None, value=b'{"topic": "test", "offset": 0}', checksum=None, serialized_key_size=-1, serialized_value_size=30, headers=()) 2021-11-17 15:46:11.177 | DEBUG | __main__:consume_message:137 - Reading test message at offset 1 : {} 2021-11-17 15:46:11.177 | INFO | __main__:consume_message:144 - No message, running background command instead 2021-11-17 15:46:20.154 | DEBUG | __main__:finish_watcher:96 - Received completed topic: ConsumerRecord(topic='my_topic_prefix.finish', partition=0, offset=1, timestamp=1637185578633, timestamp_type=0, key=None, value=b'{"topic": "test", "offset": 0}', checksum=None, serialized_key_size=-1, serialized_value_size=30, headers=()) 2021-11-17 15:46:20.208 | DEBUG | __main__:consume_message:137 - Reading test message at offset 1 : {TopicPartition(topic='my_topic_prefix.command', partition=0): [ConsumerRecord(topic='my_topic_prefix.test.command', partition=0, offset=1, timestamp=1637185573620, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=())]} 2021-11-17 15:46:20.209 | INFO | __main__:consume_message:139 - High priority issue incoming 2021-11-17 15:46:20.209 | INFO | __main__:consume_message:142 - {'topic': 'test'} ```

Environment (please complete the following information):

Reproducible example

Service ```python #!/usr/bin/env python # -*- coding: UTF-8 -*- import asyncio import json import os import motor.motor_asyncio import aiokafka from fastapi import FastAPI from loguru import logger import uvicorn from dotenv import load_dotenv from starlette.middleware.cors import CORSMiddleware from starlette_context import context, plugins from starlette_context.middleware import ContextMiddleware from starlette.middleware import Middleware load_dotenv() topic_prefix = os.getenv('TOPIC_PREFIX', 'my_topic_prefix') bootstrap_servers = os.getenv('KAFKA_BOOTSTRAP_SERVERS', "my_bootstrap_servers") kafka_client_id = os.getenv("KAFKA_CLIENT_ID", "my_client_id") mongo_uri = os.getenv("MONGO_URI", "my_mongo_uri") middleware = [ Middleware( ContextMiddleware, plugins=( plugins.RequestIdPlugin(), plugins.CorrelationIdPlugin() ) ) ] app = FastAPI(debug=True, middleware=middleware) app.add_middleware(CORSMiddleware, allow_origins=["*"]) @app.on_event("startup") async def startup_event(): context.consumers = {} context.shutdown_triggered = False context.launch_consumer = aiokafka.AIOKafkaConsumer( f"{topic_prefix}.launch", group_id="launch", client_id=f"{kafka_client_id}.launch", bootstrap_servers=bootstrap_servers, enable_auto_commit=True, auto_offset_reset="earliest", ) await context.launch_consumer.start() context.finish_consumer = aiokafka.AIOKafkaConsumer( f"{topic_prefix}.finish", group_id="finish", client_id=f"{kafka_client_id}.finish", bootstrap_servers=bootstrap_servers, enable_auto_commit=True, auto_offset_reset="earliest", ) await context.finish_consumer.start() asyncio.ensure_future(launch_watcher()) asyncio.ensure_future(finish_watcher()) context.mongo_client = motor.motor_asyncio.AsyncIOMotorClient(mongo_uri) context.mongo_db = context.mongo_client["mydb"] @app.on_event("shutdown") async def shutdown_event(): context.shutdown_triggered = True for consumer in context.consumers.values(): await consumer.stop() async def launch_watcher(): try: while not context.shutdown_triggered: async for command in context.launch_consumer: logger.debug(f"Received launch topic: {command}") parsed_topic_name = json.loads(command.value.decode("utf-8"))["topic"] asyncio.ensure_future(consume_message(parsed_topic_name)) finally: await context.launch_consumer.stop() async def finish_watcher(): # While these do the same thing in this example # the context is they consume commands from different services when starting / ending a command try: while not context.shutdown_triggered: async for message in context.finish_consumer: logger.debug(f"Received completed topic: {message}") parsed_message = json.loads(message.value.decode("utf-8")) # Do something to stop / cancel the running command then get next one available await command_done(parsed_message["topic"], parsed_message["offset"]) asyncio.ensure_future(consume_message(parsed_message["topic"])) finally: await context.finish_consumer.stop() async def ensure_consumer_started(topic_name: str): if topic_name in context.consumers: return context.consumers[topic_name] consumer = aiokafka.AIOKafkaConsumer( bootstrap_servers=bootstrap_servers, client_id=kafka_client_id, group_id=f"my_program_{topic_name}", auto_offset_reset="earliest", enable_auto_commit=False, metadata_max_age_ms=30000, ) await consumer.start() tp = aiokafka.TopicPartition(f"{topic_prefix}.{topic_name}.command", 0) consumer.assign([tp]) context.consumers[topic_name] = consumer return consumer async def consume_message(topic_name: str): consumer = await ensure_consumer_started(topic_name) # Grab offset from database offset_data = await context.mongo_db.offsets.find_one({"topic_name": topic_name}) offset = offset_data["offset"] if offset_data else 0 # Get next message without blocking tp = aiokafka.TopicPartition(f"{topic_prefix}.{topic_name}.command", 0) consumer.assign([tp]) consumer.seek(partition=tp, offset=offset) msg = await consumer.getmany(timeout_ms=1000, max_records=5) logger.debug(f"Reading {topic_name} message at offset {offset} : {msg}") if msg: logger.info("High priority issue incoming") raw_message = next(iter(msg.values()))[0] data = json.loads(raw_message.value.decode("utf-8")) logger.info(data) else: logger.info("No message, running background command instead") # Would run alt command instead with await async def command_done(topic_name, offset): offset_data = await context.mongo_db.offsets.find_one({"topic_name": topic_name}) new_offset = {"topic_name": topic_name, "offset": offset + 1} if offset_data: await context.mongo_db.offsets.replace_one({'_id': offset_data["_id"]}, new_offset) else: await context.mongo_db.offsets.insert_one(new_offset) if __name__ == '__main__': uvicorn.run(app, port=3333, debug=True) ```
Test producer ```python #!/usr/bin/env python # -*- coding: UTF-8 -*- import asyncio import json import os import aiokafka from dotenv import load_dotenv load_dotenv() topic_prefix = os.getenv('TOPIC_PREFIX', 'my_topic_prefix') bootstrap_servers = os.getenv('KAFKA_BOOTSTRAP_SERVERS', "my_bootstrap_servers") kafka_client_id = os.getenv("KAFKA_CLIENT_ID", "my_client_id") async def main(): producer = aiokafka.AIOKafkaProducer( bootstrap_servers=bootstrap_servers, client_id=kafka_client_id, ) await producer.start() await producer.send(f'{topic_prefix}.test.command', json.dumps({"topic": "test"}).encode("utf-8")) await producer.send(f'{topic_prefix}.launch', json.dumps({"topic": "test"}).encode("utf-8")) await asyncio.sleep(5) await producer.send(f'{topic_prefix}.finish', json.dumps({"topic": "test", "offset": 0}).encode("utf-8")) await asyncio.sleep(5) await producer.send(f'{topic_prefix}.test.command', json.dumps({"topic": "test"}).encode("utf-8")) await asyncio.sleep(5) await producer.send(f'{topic_prefix}.finish', json.dumps({"topic": "test", "offset": 0}).encode("utf-8")) await producer.stop() if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main()) ```
cdgriffith commented 2 years ago

I was able to get past this for now by not setting the group_id for the consumer created in ensure_consumer_started.

tvoinarovskyi commented 2 years ago

Thanks for reporting this with such details. Looking through the code I can verify that it's a bug and there is a race condition with manual assignment that will trigger the assignment.active is True assert. The assert should make sure the following code is consistent, but seems to only check for assignment changes in auto-assigned case. Will try to put together a fix.

vejmoj1 commented 1 year ago

@tvoinarovskyi, is there any progress on fixing this issue?

AnimeshRy commented 7 months ago

@tvoinarovskyi Facing this same issue for a single consumer as well. What could be the issue here ?

marcelblijleven commented 7 months ago

I've encountered this assertion exception too today, in my case it was a call to consumer.subscribe() after the consumer was started with await consumer.start(). Just thought I'd leave this here for anyone running into this issue, hope it helps someone.

YraganTron commented 4 months ago

I have exactly the same problem. aiokafka version 0.7.2

If I create an object and pass topics as an argument, and then make a subscribe by calling a method with the passed kafka group_id, I get the same error.

consumer = AIOKafkaConsumer(topics=('test_topic',), kafka_group_id='blabla')

await consumer.start()
consumer.subscribe(['test_topic'])
## assert assignment is not None and assignment.active