robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

Worker hangs after crash #684

Open bohdantan opened 3 years ago

bohdantan commented 3 years ago

Checklist

Steps to reproduce

import faust

app = faust.App( 'test-app', broker='kafka://localhost:9092', value_serializer='raw', )

test_topic = app.topic('test')

@app.agent(test_topic) async def process(stream): async for msg in stream: print(f'Start processing {msg}') await asyncio.sleep(30) await app.crash(RuntimeError('CRASH')) print(f'Finish processing {msg}')

@app.task(on_leader=True) async def on_started(): print('Add messages') for i in range(5): msg = f'Message i={i}' await test_topic.send(key=msg, value=msg)


- Start worker `faust -A example worker -l info --without-web`. Worker will create kafka topic. Task will write 5 messages to topic
- Right after first wroker instance starts processing message (prints `Start  processing b'Message i=0'`), start second worker instance `faust -A example worker -l info --without-web`
- First worker will write `Heartbeat failed for group test-app because it is rebalancing`, then `Revoking previously assigned partitions` and wait for processing task.
- First worker will crash and begin stopping, but then it will hang forever

## Expected behavior

Worker stops after crashing.

## Actual behavior

Worker hangs after crash. The same happens also if AIOKafkaConsumerThread crashes (e.g. here https://github.com/robinhood/faust/blob/master/faust/transport/drivers/aiokafka.py#L544) but that is harder to reproduce.
As I found, reason is that at first AIOKafkaConsumerThread.on_thread_stop is called (where thread method queue QueueServiceThread.method_queue is stopped) and then in AIOKafkaConsumerThread.commit there is call to call_thread, which adds task to already stoppped method queue and waits until task is completed. But task will never be completed because method queue is stopped

## Full traceback

```pytb
[2020-11-04 13:15:21,272] [1327021] [ERROR] [^-App]: Crashed reason=RuntimeError('CRASH')
NoneType: None
[2020-11-04 13:15:21,273] [1327021] [WARNING] Finish processing b'Message i=0'
[2020-11-04 13:15:21,275] [1327021] [INFO] [^Worker]: Stopping...
[2020-11-04 13:15:21,277] [1327021] [INFO] [^-App]: Stopping...
[2020-11-04 13:15:21,277] [1327021] [INFO] [^---Fetcher]: Stopping...
[2020-11-04 13:15:21,278] [1327021] [INFO] [^--Consumer]: Consumer shutting down for user cancel.
[2020-11-04 13:15:21,278] [1327021] [INFO] [^-App]: Wait for streams...
[2020-11-04 13:15:21,281] [1327021] [INFO] LeaveGroup request succeeded
[2020-11-04 13:20:21,279] [1327021] [WARNING] [^--Consumer]: Warning: Task timed out!
[2020-11-04 13:20:21,279] [1327021] [WARNING] [^--Consumer]: Please make sure it's hanging before restart.
[2020-11-04 13:20:21,280] [1327021] [INFO] [^--Consumer]: [Flight Recorder-5] (started at Wed Nov  4 13:15:21 2020) Replaying logs...
[2020-11-04 13:20:21,280] [1327021] [INFO] [^--Consumer]: [Flight Recorder-5] (Wed Nov  4 13:15:21 2020) +consumer.commit()
[2020-11-04 13:20:21,280] [1327021] [INFO] [^--Consumer]: [Flight Recorder-5] -End of log-
[2020-11-04 13:20:21,280] [1327021] [INFO] [^--Consumer]: [Flight Recorder-5] Task traceback
[2020-11-04 13:20:21,282] [1327021] [INFO] [^--Consumer]: Stack for <Task pending name='Task-383' coro=<Service.stop() running at venv/lib/python3.8/site-packages/mode/services.py:862> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f0c19ef2e80>()]> cb=[shield.<locals>._inner_done_callback() at /usr/lib/python3.8/asyncio/tasks.py:882]> (most recent call last):
  File "venv/lib/python3.8/site-packages/mode/services.py", line 862, in stop
    await self.on_stop()
  File "venv/lib/python3.8/site-packages/faust/app/base.py", line 1471, in on_stop
    await self._stop_consumer()
  File "venv/lib/python3.8/site-packages/faust/app/base.py", line 1497, in _stop_consumer
    await self._consumer_wait_empty(consumer, self.log)
  File "venv/lib/python3.8/site-packages/faust/app/base.py", line 1503, in _consumer_wait_empty
    await consumer.wait_empty()
  File "venv/lib/python3.8/site-packages/mode/services.py", line 460, in _and_transition
    return await fun(self, *args, **kwargs)
  File "venv/lib/python3.8/site-packages/faust/transport/consumer.py", line 772, in wait_empty
    await T(self.commit_and_end_transactions)()
  File "venv/lib/python3.8/site-packages/faust/transport/consumer.py", line 783, in commit_and_end_transactions
    await self.commit(start_new_transaction=False)
  File "venv/lib/python3.8/site-packages/faust/transport/consumer.py", line 837, in commit
    return await self.force_commit(
  File "venv/lib/python3.8/site-packages/mode/services.py", line 460, in _and_transition
    return await fun(self, *args, **kwargs)
  File "venv/lib/python3.8/site-packages/faust/transport/consumer.py", line 872, in force_commit
    did_commit = await self._commit_tps(
  File "venv/lib/python3.8/site-packages/faust/transport/consumer.py", line 889, in _commit_tps
    return await self._commit_offsets(
  File "venv/lib/python3.8/site-packages/faust/transport/consumer.py", line 958, in _commit_offsets
    did_commit = await self._commit(committable_offsets)
  File "venv/lib/python3.8/site-packages/faust/transport/consumer.py", line 1326, in _commit
    return await self._thread.commit(offsets)
  File "venv/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 524, in commit
    return await self.call_thread(self._commit, offsets)
  File "venv/lib/python3.8/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise

Versions

taybin commented 3 years ago

I've seen this too. I've had to kill -9 the process pretty consistently.

lachuta commented 1 year ago

@bohdantan - Do you have any updates on this by any chance ? ... Also seeing the same behavior.

bohdantan commented 1 year ago

@lachuta I do not have any updates. We have patched faust.transport.drivers.aiokafka.AIOKafkaConsumerThread.commit to write error to logs and return if self.method_queue.should_stop: Actually this repository is not updated for more than 2 years. There is a fork https://github.com/faust-streaming/faust but I am not sure this bug is fixed there

lachuta commented 1 year ago

@bohdantan - is your patch available anywhere ? Did it make it to a faust version ?. These are the version we are using: