faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

Faust agents die after some days #523

Open alihoseiny opened 1 year ago

alihoseiny commented 1 year ago

Actual behavior

After running the application for some days, agents die slowly and after a few days, no event consumes from topics. If I do not restart the docker container of the Faust app, all agents stop consumption after a few days.

Full traceback

* m_consumer.m_agent ----->
============================================================
['Stack for <coroutine object movie_updated_agent at 0x7f303428bce0> (most recent call last):\n  File "/project/m_consumer/movie_updated_consumer.py", line 22, in movie_updated_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n']

* z_consumer.retry_send_product_agent ----->
============================================================
['Stack for <coroutine object retry_send_product_agent at 0x7f30341506b0> (most recent call last):\n  File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object retry_send_product_agent at 0x7f303428bf00> (most recent call last):\n  File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object retry_send_product_agent at 0x7f3034150af0> (most recent call last):\n  File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object retry_send_product_agent at 0x7f30341508d0> (most recent call last):\n  File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object retry_send_product_agent at 0x7f3034150490> (most recent call last):\n  File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n']

* o_consumer.o_agent ----->
============================================================
['Stack for <coroutine object order_agent at 0x7f303419a750> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f30341b3450> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f3034175d50> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f30341b2150> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f3034199450> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f30341b0e50> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f30341c4850> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f3034198150> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f303419ba50> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f3034174b50> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n']

* z.z_consumer.send_user_agent ----->
============================================================
['Stack for <coroutine object send_user_agent at 0x7f30341f8710> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8dd0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9010> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8830> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8ef0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f95b0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8950> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9130> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f96d0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f84d0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8a70> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9250> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f85f0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f97f0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8b90> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9370> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f303427e7b0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8170> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8cb0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9490> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n']

* zb_core.transport_layer.zb_product_sent_consumer.consumer.send_product_agent ----->
============================================================
['Stack for <coroutine object send_product_agent at 0x7f3034108dd0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108050> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108710> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034109250> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108ef0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108170> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108830> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034109370> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034109010> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108950> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108290> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341fb770> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108b90> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341083b0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108a70> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341fbe30> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034109130> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341084d0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108cb0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341085f0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n']

Sample code

Sample code of one the consumers:

@app.agent(event_sent_topic, concurrency=20)
async def send_event_agent(stream):
    """"""
    task_name = asyncio.current_task().get_name()
    interface = Interface()

    async for records in stream.take(10, 1):
        for record in records:
            send_event_tasks_queue[record.user_id].append(task_name)
            await _wrap_send_event(record=record, interface=interface)

        await asyncio.sleep(0)  # Skipping current event loop run for giving execution chance to other tasks.

Versions

alihoseiny commented 1 year ago

For extra context, we also have following warning logs:

[2023-06-15 08:38:54,210] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.297 seconds
[2023-06-15 08:38:54,462] [1] [INFO] Timer Monitor.sampler woke up too late, with a drift of +0.336312010884285 runtime=4.301220178604126e-05 sleeptime=1.336312010884285
[2023-06-15 08:38:54,838] [1] [WARNING] Executing <Task pending name='Task-190' coro=<Agent._execute_actor() running at /usr/local/lib/python3.11/site-packages/faust/agents/agent.py:674> cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/site-packages/faust/agents/agent.py:664> took 0.102 seconds
[2023-06-15 08:38:55,500] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.281 seconds
[2023-06-15 08:38:55,768] [1] [INFO] Timer Monitor.sampler woke up too late, with a drift of +0.30657482892274857 runtime=4.564225673675537e-05 sleeptime=1.3065748289227486
[2023-06-15 08:38:56,789] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.280 seconds
[2023-06-15 08:38:58,079] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.283 seconds
[2023-06-15 08:38:59,384] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.296 seconds
[2023-06-15 08:38:59,629] [1] [INFO] Timer Monitor.sampler woke up too late, with a drift of +0.3187522292137146 runtime=4.420429468154907e-05 sleeptime=1.3187522292137146
[2023-06-15 08:39:00,682] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.290 seconds
[2023-06-15 08:39:00,933] [1] [INFO] Timer Monitor.sampler woke up too late, with a drift of +0.30384987592697144 runtime=5.805492401123047e-05 sleeptime=1.3038498759269714
[2023-06-15 08:39:01,969] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.277 seconds
[2023-06-15 08:39:03,264] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.289 seconds
[2023-06-15 08:39:04,560] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.287 seconds
[2023-06-15 08:39:05,851] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.283 seconds
[2023-06-15 08:39:06,858] [1] [WARNING] [^--Consumer]: wait_empty: Waiting for tasks

[2023-06-15 08:55:59,973] [1] [INFO] Timer Monitor.sampler woke up too late, with a drift of +0.32391348481178284 runtime=4.1179358959198e-05 sleeptime=1.3239134848117828
[2023-06-15 08:56:01,013] [1] [WARNING] Executing <Task pending name='Task-763' coro=<AIOKafkaConnection._read() running at /usr/local/lib/python3.11/site-packages/aiokafka/conn.py:525> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[AIOKafkaConnection._on_read_task_error(<weakref at 0...x7f30230ac330>)()] created at /usr/local/lib/python3.11/site-packages/aiokafka/util.py:26> took 0.291 seconds
wbarnha commented 1 year ago

Likely a duplicate of https://github.com/faust-streaming/faust/issues/175 😦. If I figure out a solution, I'll immediately have a fix deployed.

dada-engineer commented 1 year ago

@wbarnha I am not sure this is correlated but we had the issue that our broker was unavailable for some time. Then the heartbeat failed and the coordinator has been marked as dead. after some time when the broker comes back to life aiokafka starts a new coordinator, and shouldrejoin the group. Then all TopicPartitions are seeked to the last commited offset (although the faust leader has no last commited offset so it is seeked to 0). After that we see Adding Fetch request logs and no errors, but nothing is consumed, the offset never advances. Our consumer is behind 60msg or so though. Do you have any idea?

Is this an issue that our group member may be marked as dead, comes back to live and then not unmarked as dead, so kafka does not rebalance and assign stuff to the consumer? We only have one consumer and the group is marked as empty for us so a new join should do the trick I guess...

Edit: It looks like somewhen all topics except the ._assignor_leader are dropped and no fetch request is then sent. Happens after the next new message is detected it seems.

Edit2: maybe we pause a partition somewhere and never resume it. Just can't find why, where, when

wbarnha commented 1 year ago

Thank you for your thorough notes, I'll need to come back and review this. I wish there were more unittests to ensure all of Faust's coordinators behave properly, so I could understand which parts of them are shaky.

dada-engineer commented 1 year ago

Yes sorry I couldn't yet wrap my head around this part so far. Thanks for your work ☺️

dada-engineer commented 1 year ago

What's also strange is that I have a log in the following order.

Fetch request for topic with Highwater and last_stable_offset 694 After that I get a log records added for this topic with offset 693

Can this inconsistency stop the consumption as we are waiting for 694 and only geht 693? After those logs there are no more fetch requests for this specific topic... No other logs except metadata requests after the above log lines.

dada-engineer commented 1 year ago

Found some discussions about this in the underlying aiokafka lib that might be related:

https://github.com/aio-libs/aiokafka/issues/625 https://github.com/aio-libs/aiokafka/issues/575 https://github.com/aio-libs/aiokafka/issues/727

Maybe we should check our on_partitions_revoked callback.

dada-engineer commented 1 year ago

Within the aiokafka documentation there is this part:

_It is quite critical to provide ConsumerRebalanceListener if you need to control rebalance start and end moments. In that case set the rebalance_timeout_ms to the maximum time your application can spend waiting in the callback. If your callback waits for the last getmany() result to be processed, it is safe to set this value to max_poll_intervalms, same as in Java client.

The default faust setting for this is broker_rebalance_timeout and broker_session_timeout, both set to 60 seconds by default.

on rebalance by default we do wait for stream to be empty. If this takes longer than the broker rebalance timeout or session timeout, our consumer is considered dead (by default the stream buffer max size is 4096 unprocessed items, make sure you can process those within 60 seconds.

So anyone having this issue: Could you check if this is the source of failure for you?