robinhood / faust

Python Stream Processing
Other
6.7k stars 538 forks source link

asyncio.exceptions.CancelledError and consumer stops #772

Open lachuta opened 1 year ago

lachuta commented 1 year ago

Checklist

Steps to reproduce

Have not been able to reproduce

Expected behavior

Not have the consumers stop processing

Actual behavior

Have a consumer that is deployed in 4 hosts, consuming from a topic on 16 partitions (4 per host). Have seen the consumer get the exception below and stoped consuming messages, with the following showing in the logs:

Some consumer configurations: broker_heartbeat_interval: 6 broker_request_timeout: 180 broker_session_timeout: 120 broker_max_poll_records: 300

Full traceback

2022-11-29T15:20:36.894Z mss_sa_refresh_stream_processor INFO [request_id={173e3da6-6ee3-4fa9-ba16-460a14961ba8} transaction_id={5443682e-7eb8-4012-8f8b-8efe6cca45cc} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6220141. Duration: 71 ms
2022-11-29T15:20:36.895Z mss_sa_refresh_stream_processor INFO [request_id={03336c53-750c-4520-a4d9-638493a068ec} transaction_id={739ee30b-d420-451e-9aba-b92f6b6597b9} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] Start processing event KEY: 6191034
2022-11-29T15:20:36.968Z mss_sa_refresh_stream_processor INFO [request_id={03336c53-750c-4520-a4d9-638493a068ec} transaction_id={739ee30b-d420-451e-9aba-b92f6b6597b9} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6191034. Duration: 73 ms
2022-11-29T15:20:36.969Z mss_sa_refresh_stream_processor INFO [request_id={04195a4c-c944-44b0-8e4a-e9cdf91ecc9f} transaction_id={72d56cb5-a0df-4b7f-b775-c64ee024e2f9} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] Start processing event KEY: 6190948
2022-11-29T15:20:37.042Z mss_sa_refresh_stream_processor INFO [request_id={04195a4c-c944-44b0-8e4a-e9cdf91ecc9f} transaction_id={72d56cb5-a0df-4b7f-b775-c64ee024e2f9} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6190948. Duration: 74 ms
2022-11-29T15:20:37.042Z mss_sa_refresh_stream_processor INFO [request_id={4f58da8f-4422-444b-9aca-2772e00b3778} transaction_id={f74748f1-845d-46b1-aa1c-8f90ae1956a3} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] Start processing event KEY: 6221813
2022-11-29T15:20:37.138Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] Exception in thread
2022-11-29T15:20:37.138Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] Thread-1
2022-11-29T15:20:37.139Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] :
2022-11-29T15:20:37.139Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] Traceback (most recent call last):
2022-11-29T15:20:37.140Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/utils/locks.py", line 76, in wait
2022-11-29T15:20:37.140Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] await fut
2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] asyncio.exceptions
2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] .
2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] CancelledError
2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] The above exception was the direct cause of the following exception:
2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] Traceback (most recent call last):
2022-11-29T15:20:37.141Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
2022-11-29T15:20:37.142Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self.run()
2022-11-29T15:20:37.142Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/threads.py", line 66, in run
2022-11-29T15:20:37.142Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self.service._start_thread()
2022-11-29T15:20:37.142Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/threads.py", line 211, in _start_thread
2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self.thread_loop.run_until_complete(self._serve())
2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/base_events.py", line 603, in run_until_complete
2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self.run_forever()
2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/base_events.py", line 570, in run_forever
2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] self._run_once()
2022-11-29T15:20:37.143Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/base_events.py", line 1859, in _run_once
2022-11-29T15:20:37.145Z mss_sa_refresh_stream_processor INFO [request_id={4f58da8f-4422-444b-9aca-2772e00b3778} transaction_id={f74748f1-845d-46b1-aa1c-8f90ae1956a3} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6221813. Duration: 102 ms
2022-11-29T15:20:37.146Z mss_sa_refresh_stream_processor INFO [request_id={d4ddb314-1b8f-4bc3-9508-a9ca6f5ebabd} transaction_id={8ecb1508-5719-4687-9b08-ed9018054797} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] Start processing event KEY: 6434463
2022-11-29T15:20:37.150Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] handle._run()
2022-11-29T15:20:37.150Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] SystemError
2022-11-29T15:20:37.150Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] :
2022-11-29T15:20:37.150Z mss_sa_refresh_stream_processor WARNING [logger={mode.redirect}] PyEval_EvalFrameEx returned a result with an error set
2022-11-29T15:20:37.222Z mss_sa_refresh_stream_processor INFO [request_id={d4ddb314-1b8f-4bc3-9508-a9ca6f5ebabd} transaction_id={8ecb1508-5719-4687-9b08-ed9018054797} logger={mss_sa_refresh_stream_processor.svc_loc_refresh.agents}] End processing event KEY: 6434463. Duration: 77 ms
2022-11-29T15:25:37.943Z mss_sa_refresh_stream_processor WARNING [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Warning: Task timed out!
2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor WARNING [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Please make sure it's hanging before restart.
2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-7113] (started at Tue Nov 29 15:20:37 2022) Replaying logs...
2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-7113] (Tue Nov 29 15:20:37 2022) +consumer.commit()
2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-7113] -End of log-
2022-11-29T15:25:37.944Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-7113] Task traceback
2022-11-29T15:25:37.946Z mss_sa_refresh_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Stack for <Task pending name='<coroutine object Consumer._commit_handler at 0x7f88c9e183c0>' coro=<Service._execute_task() running at /opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/services.py:779> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f88bacfd340>()]> cb=[Service._on_future_done()]> (most recent call last):
0>' coro=<Service._execute_task() running at /opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/services.py:779> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f88bacfd340>()]> cb=[Service._on_future_done()]> (most recent call last):
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 783, in _commit_handler
    await self.commit()
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 822, in commit
    return await self.force_commit(
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/services.py", line 459, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 857, in force_commit
    did_commit = await self._commit_tps(
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 874, in _commit_tps
    return await self._commit_offsets(
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 943, in _commit_offsets
    did_commit = await self._commit(committable_offsets)
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 1311, in _commit
    return await self._thread.commit(offsets)
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 523, in commit
    return await self.call_thread(self._commit, offsets)
  File "/opt/abc/mss-sa-refresh-stream-processor/releases/20221122T162351/venv/lib64/python3.8/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise

Versions

wbarnha commented 1 year ago

Try https://github.com/faust-streaming/faust, we actively maintain that fork.