aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.08k stars 224 forks source link

Consumer stopped consuming, task Fetcher._fetch_task has finished #983

Closed pavelschon closed 1 month ago

pavelschon commented 4 months ago

Describe the bug Under unclear conditions, our consumer has stopped receiving messages. We investigated, when it happens, _fetch_task is finished, despite the Fetcher object is still alive (Fetcher.close() was not called).

 '_fetch_task': <Task finished name='Task-29' coro=<Fetcher._fetch_requests_routine() done, defined at /opt/poetry/venv/lib/python3.12/site-packages/aiokafka/consumer/fetcher.py:457> result=None>,

We suspect the issue might be in the _fetch_requests_routine(), that it incorrectly handles CancelledError. According to the python documentation, CancelledError shall be reraised, while the above routine just suppress it.

Environment (please complete the following information):

AxTheB commented 4 months ago

When we replaced the fetch task with fresh one, it did not receive new messages (there were messages in assigned partition) and got finished too, without logging anything. I see only one way for this task to get to finished state, and that is by swallowing the CancelledError. If the error got propagated, it would allow the app to restart and quickly resume consuming messages.

ods commented 4 months ago

The only place where _fetch_task is cancelled is the Fetcher.close() method. Or do you see other options? Also, for this exception to be propagated, you have to await the task, which is also done in close() method only.

ods commented 4 months ago

Hmm, there is other option to get CancelledError here:

                    for task in self._pending_tasks:
                        # Those tasks should have proper handling for
                        # cancellation
                        if not task.done():
                            task.cancel()
                        await task
pavelschon commented 4 months ago

Can you confirm this is a bug in aiokafka?

On our side, we had to implement a workaround

if consumer._fetcher._fetch_task.done():
   # restart app
apmorton commented 1 month ago

(originally posted this on #847, but on further investigation I think this is the correct issue)

@ods I have bisected this issue to #802

Consider the following script:

import asyncio
import logging
import time

import aiokafka

async def consume_task(consumer):
    async for msg in consumer:
        print(msg)
        await consumer.commit()

async def lag_task(producer):
    while True:
        print("sleeping")
        await asyncio.sleep(10)
        print("inducing lag")
        time.sleep(40)
        print("sending message")
        await producer.send_and_wait(
            topic='some_topic',
            value=b'a message',
        )

async def main():
    async with (
        aiokafka.AIOKafkaProducer() as producer,
        aiokafka.AIOKafkaConsumer(
            'some_topic',
            group_id='some_group',
            enable_auto_commit=False,
        ) as consumer,
    ):
        await consumer.seek_to_end()

        task1 = asyncio.create_task(consume_task(consumer))
        task2 = asyncio.create_task(lag_task(producer))

        consumer._fetcher._fetch_task.add_done_callback(lambda t: print('fetch task done'))

        await asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED)
        print("something finished")

if __name__ == '__main__':
    # logging.basicConfig(level=logging.DEBUG)
    asyncio.run(main())

Prior to #802 this will print:

sleeping
inducing lag
sending message
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 0)for group some_group.
sleeping
ConsumerRecord(topic='some_topic', partition=0, offset=10, timestamp=1715232041461, timestamp_type=0, key=None, value=b'a message', checksum=None, serialized_key_size=-1, serialized_value_size=9, headers=())

After #802 this will print:

sleeping
inducing lag
sending message
Failed fetch messages from 0: [Error 7] RequestTimedOutError
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 0)for group some_group.
sleeping
fetch task done

Notice the ConsumerRecord is not printed and fetch task done is.

If you turn on debug logging you will see that after the induced lag we stop getting:

DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition

No new messages will ever be received by consume_task after this point.

Reverting this single line from #802 restores the original behavior:

diff --git a/aiokafka/conn.py b/aiokafka/conn.py
index da27fd2..2ceb9ba 100644
--- a/aiokafka/conn.py
+++ b/aiokafka/conn.py
@@ -450,7 +450,7 @@ class AIOKafkaConnection:
             return self._writer.drain()
         fut = self._loop.create_future()
         self._requests.append((correlation_id, request.RESPONSE_TYPE, fut))
-        return wait_for(fut, self._request_timeout)
+        return asyncio.wait_for(fut, self._request_timeout)

     def _send_sasl_token(self, payload, expect_response=True):
         if self._writer is None:
ods commented 1 month ago

Hi @apmorton, Thank you for reproducing the problem. Right, in Python from 3.8.6 there was a bug in asyncio.wait_for() which was fixed in 3.12 by using the same approach, as used here. No surprise that a bug that doesn't handle some exception and another that swallows it may compensate each other.

ods commented 1 month ago

Other similar places we also can have problems with:

ods commented 1 month ago

Here is a snippet to demonstrate the problem with just suppressing CancelledError:

import asyncio
from time import time

async def task_with_cleanup():
    try:
        await asyncio.sleep(1000)
    finally:
        print("Cleanup", time() - started)
        await asyncio.sleep(2)

async def worker():
    task = asyncio.create_task(task_with_cleanup())
    await asyncio.sleep(0)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("CancelledError is suppressed", time() - started)
    await asyncio.sleep(10)

async def main():
    await asyncio.wait_for(worker(), timeout=1)
    elapsed = time() - started
    assert elapsed < 2, elapsed

started = time()
asyncio.run(main())