robinhood / faust

Python Stream Processing
Other
6.7k stars 538 forks source link

Consumer crashes sporadically and stops consuming - Crashed reason=ConsumerStoppedError() #771

Open lachuta opened 1 year ago

lachuta commented 1 year ago

Checklist

Steps to reproduce

Have not been able to reproduce

Expected behavior

Not have the consumers crashed

Actual behavior

Have a consumer that is deployed in 4 hosts, consuming from a topic on 16 partitions (4 per host). 2 to 3 times a week a consumer crashes, stops consuming messages, with the following showing in the logs:

Some consumer configurations: broker_heartbeat_interval: 6 broker_request_timeout: 180
broker_session_timeout: 120 broker_max_poll_records: 300

Full traceback

2022-11-21T00:01:02.132Z nid_martens_sa_stream_processor ERROR [logger={faust.transport.consumer}] [^---Fetcher]: Crashed reason=ConsumerStoppedError()
Traceback (most recent call last):
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 176, in _fetcher
    await self._drainer
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 1039, in _drain_messages
    async for tp, message in ait:
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 640, in getmany
    records, active_partitions = await self._wait_next_records(timeout)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 676, in _wait_next_records
    records = await self._getmany(
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 1269, in _getmany
    return await self._thread.getmany(active_partitions, timeout)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 805, in getmany
    return await self.call_thread(
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 822, in _fetch_records
    raise ConsumerStoppedError()
aiokafka.errors.ConsumerStoppedError
2022-11-21T00:01:02.137Z nid_martens_sa_stream_processor INFO [logger={faust.worker}] [^Worker]: Stopping...
2022-11-21T00:01:02.138Z nid_martens_sa_stream_processor INFO [logger={abc_logging_faust.app}] [^-abcLoggingFaustApp]: Stopping...
2022-11-21T00:01:02.138Z nid_martens_sa_stream_processor INFO [logger={faust.transport.consumer}] [^---Fetcher]: Stopping...
2022-11-21T00:01:02.138Z nid_martens_sa_stream_processor INFO [logger={abc_logging_faust.app}] [^-abcLoggingFaustApp]: Wait for streams...
2022-11-21T00:01:02.168Z nid_martens_sa_stream_processor INFO [logger={aiokafka.consumer.group_coordinator}] LeaveGroup request succeeded
2022-11-21T00:01:02.168Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] Exception in thread
2022-11-21T00:01:02.168Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] Thread-1
2022-11-21T00:01:02.169Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] :
2022-11-21T00:01:02.169Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] StopIteration
2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] :
2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] FetchResponse_v4(throttle_time_ms=0, topics=[(topics='addr-pair-gather', partitions=[(partition=8, error_code=0, highwater_offset=466624, last_stable_offset=466624, aborted_transactions=NULL, message_set=b'')])])
2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] The above exception was the direct cause of the following exception:
2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] Traceback (most recent call last):
2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] self.run()
2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 66, in run
2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] self.service._start_thread()
2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 211, in _start_thread
2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] self.thread_loop.run_until_complete(self._serve())
2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/base_events.py", line 616, in run_until_complete
2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] return future.result()
2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 264, in _serve
2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self._shutdown_thread()
2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 240, in _shutdown_thread
2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self.on_thread_stop()
2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 309, in on_thread_stop
2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self._consumer.stop()
2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/consumer.py", line 503, in stop
2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self._fetcher.close()
2022-11-21T00:01:02.174Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 429, in close
2022-11-21T00:01:02.174Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self._fetch_task
2022-11-21T00:01:02.174Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 557, in _fetch_requests_routine
2022-11-21T00:01:02.174Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self.close()
2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 439, in close
2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await x
2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 547, in _fetch_requests_routine
2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] has_new_data = any(fut.result() for fut in done_pending)
2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 547, in <genexpr>
2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] has_new_data = any(fut.result() for fut in done_pending)
2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 681, in _proc_fetch_request
2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] time_response = monotonic()
2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] SystemError
2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] :
2022-11-21T00:01:02.177Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] <built-in function monotonic> returned a result with an error set
2022-11-21T00:06:02.165Z nid_martens_sa_stream_processor WARNING [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Warning: Task timed out!
2022-11-21T00:06:02.166Z nid_martens_sa_stream_processor WARNING [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Please make sure it's hanging before restart.
2022-11-21T00:06:02.166Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-2454] (started at Mon Nov 21 00:01:02 2022) Replaying logs...
2022-11-21T00:06:02.166Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-2454] (Mon Nov 21 00:01:02 2022) +consumer.commit()
2022-11-21T00:06:02.167Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-2454] -End of log-
2022-11-21T00:06:02.167Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-2454] Task traceback
2022-11-21T00:06:02.168Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Stack for <Task pending name='Task-2717096' coro=<Service.stop() running at /opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py:839> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc9effbdac0>()]> cb=[shield.<locals>._inner_done_callback() at /opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/tasks.py:885]> (most recent call last):
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py", line 839, in stop
    await self.on_stop()
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/app/base.py", line 1467, in on_stop
    await self._stop_consumer()
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/app/base.py", line 1493, in _stop_consumer
    await self._consumer_wait_empty(consumer, self.log)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/app/base.py", line 1499, in _consumer_wait_empty
    await consumer.wait_empty()
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py", line 459, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 757, in wait_empty
    await T(self.commit_and_end_transactions)()
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 768, in commit_and_end_transactions
    await self.commit(start_new_transaction=False)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 822, in commit
    return await self.force_commit(
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py", line 459, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 857, in force_commit
    did_commit = await self._commit_tps(
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 874, in _commit_tps
    return await self._commit_offsets(
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 943, in _commit_offsets
    did_commit = await self._commit(committable_offsets)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 1311, in _commit
    return await self._thread.commit(offsets)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 523, in commit
    return await self.call_thread(self._commit, offsets)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise

Versions