robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

Faust hangs with wait_empty: Waiting for [...] tasks #402

Open maksimushka32 opened 5 years ago

maksimushka32 commented 5 years ago

Checklist

Steps to reproduce

Faust set up as service and make some logs filtering from _in topic to _out with code like

async for value in stream:
<text edit>
await output_topic.send(value = json.dumps(js_msg).encode())

I've got several of this "filters", but issue persists only in one

Expected behavior

Working

Actual behavior

Right after start it starts logging info messages:

[2019-08-12 17:39:31,158: INFO]: Timer _thread_keepalive woke up too late, with a drift of +15.980544116348028
[2019-08-12 17:39:31,455: INFO]: Timer commit woke up too late, with a drift of +16.265384099259972

Then after days of working it suddenly failed to commit offset

[2019-08-12 17:08:59,299: ERROR]: OffsetCommit failed for group TOPIC_processor due to group error ([Error 25] UnknownMemberIdError: TOPIC_processor), will rejoin

Re-joining group for some times and then after ~hour of rejoining it just hangs with mwssages

wait_empty: Waiting for [(0, <ConsumerMessage: TopicPartition(topic='TOPIC_in', partition=16) offset=1729599>)] tasks

Full traceback

[2019-08-12 17:11:27,216: ERROR]: OffsetCommit failed for group TOPIC_processor due to group error ([Error 25] UnknownMemberIdError: TOPIC_processor), will rejoin
[2019-08-12 17:11:27,217: INFO]: Revoking previously assigned partitions frozenset() for group TOPIC_processor
[2019-08-12 17:11:27,217: WARNING]: Heartbeat failed: local member_id was not recognized; resetting and re-joining group
[2019-08-12 17:11:27,217: ERROR]: Heartbeat session expired - marking coordinator dead
[2019-08-12 17:11:27,217: WARNING]: Marking the coordinator dead (node 0)for group TOPIC_processor.
[2019-08-12 17:11:28,210: INFO]: Timer commit woke up too late, with a drift of +79.80464283265174
[2019-08-12 17:11:28,210: INFO]: Timer Monitor.sampler woke up too late, with a drift of +0.5432681329548359
[2019-08-12 17:11:28,211: INFO]: Timer _main_keepalive woke up too late, with a drift of +0.5433635283261538
[2019-08-12 17:11:38,409: WARNING]: [^--Consumer]: wait_empty: Waiting for [(0, <ConsumerMessage: TopicPartition(topic='TOPIC_in', partition=16) offset=1729599>)] tasks
[2019-08-12 17:11:49,996: WARNING]: [^--Consumer]: wait_empty: Waiting for [(0, <ConsumerMessage: TopicPartition(topic='TOPIC_in', partition=16) offset=1729599>)] tasks
[2019-08-12 17:12:01,508: WARNING]: [^--Consumer]: wait_empty: Waiting for [(0, <ConsumerMessage: TopicPartition(topic='TOPIC_in', partition=16) offset=1729599>)] tasks
[2019-08-12 17:12:13,053: WARNING]: [^--Consumer]: wait_empty: Waiting for [(0, <ConsumerMessage: TopicPartition(topic='TOPIC_in', partition=16) offset=1729599>)] tasks
[2019-08-12 17:12:24,593: WARNING]: [^--Consumer]: wait_empty: Waiting for [(0, <ConsumerMessage: TopicPartition(topic='TOPIC_in', partition=16) offset=1729599>)] tasks
[2019-08-12 17:12:25,211: WARNING]: [^-App]: Warning: Task timed out!
[2019-08-12 17:12:25,212: WARNING]: [^-App]: Please make sure it's hanging before restart.
[2019-08-12 17:12:25,212: INFO]: [^-App]: [Flight Recorder-44] (started at Mon Aug 12 17:11:28 2019) Replaying logs...
[2019-08-12 17:12:25,212: INFO]: [^-App]: [Flight Recorder-44] (Mon Aug 12 17:11:28 2019) flow_control.suspend()
[2019-08-12 17:12:25,212: INFO]: [^-App]: [Flight Recorder-44] (Mon Aug 12 17:11:28 2019) consumer.pause_partitions
[2019-08-12 17:12:25,212: INFO]: [^-App]: [Flight Recorder-44] (Mon Aug 12 17:11:28 2019) flow_control.clear()
[2019-08-12 17:12:25,212: INFO]: [^-App]: [Flight Recorder-44] (Mon Aug 12 17:11:28 2019) Wait for streams...
[2019-08-12 17:12:25,212: INFO]: [^-App]: [Flight Recorder-44] -End of log-
[2019-08-12 17:12:25,212: INFO]: [^-App]: [Flight Recorder-44] Task traceback
[2019-08-12 17:12:25,225: INFO]: [^-App]: Stack for <Task pending coro=<Service._execute_task() running at /usr/local/lib/python3.6/site-packages/mode/services.py:762> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f78303fa7c8>()]> cb=[Service._on_future_done()]> (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 762, in _execute_task
    await task
  File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 278, in _method_queue_do_work
    await process_enqueued(item)
  File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 342, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/usr/local/lib/python3.6/site-packages/mode/utils/futures.py", line 133, in maybe_async
    return await res
  File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 456, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/faust/transport/consumer.py", line 568, in on_partitions_revoked
    revoked)
  File "/usr/local/lib/python3.6/site-packages/faust/utils/tracing.py", line 121, in corowrapped
    await_ret = await ret
  File "/usr/local/lib/python3.6/site-packages/faust/app/base.py", line 1476, in _on_partitions_revoked
    await T(self._consumer_wait_empty)(consumer, on_timeout)
  File "/usr/local/lib/python3.6/site-packages/faust/utils/tracing.py", line 121, in corowrapped
    await_ret = await ret
  File "/usr/local/lib/python3.6/site-packages/faust/app/base.py", line 1395, in _consumer_wait_empty
    await consumer.wait_empty()
  File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 456, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/faust/transport/consumer.py", line 749, in wait_empty
    await T(self._wait_for_ack)(timeout=1)
  File "/usr/local/lib/python3.6/site-packages/faust/utils/tracing.py", line 121, in corowrapped
    await_ret = await ret
  File "/usr/local/lib/python3.6/site-packages/faust/transport/consumer.py", line 726, in _wait_for_ack
    self._waiting_for_ack, loop=self.loop, timeout=1)
  File "/usr/lib64/python3.6/asyncio/tasks.py", line 351, in wait_for
    yield from waiter

[2019-08-12 17:12:36,129: WARNING]: [^--Consumer]: wait_empty: Waiting for [(0, <ConsumerMessage: TopicPartition(topic='TOPIC_in', partition=16) offset=1729599>)] tasks
[2019-08-12 17:12:47,808: WARNING]: [^--Consumer]: wait_empty: Waiting for [(0, <ConsumerMessage: TopicPartition(topic='TOPIC_in', partition=16) offset=1729599>)] tasks
.....

Versions

barelyreal commented 5 years ago

I'm also seeing the same problem. It seems like it's more common when consuming from a topic with more partitions (~100 in my case) than less (~30)

maksimushka32 commented 5 years ago

The main problem, that i have ~15 topics, more or less the same, all with 50 partitions and all with practicaly same amount of in and out data, but problem persists only in one of them.

StephenSorriaux commented 5 years ago

Hi,

Seeing your logs, it seems like your consumer was considered dead (certainly due to not sending any heartbeats in time) and a rebalance occured and finished before it was noticed by the consumer. This led to the consumer not being able to commit its offset and being stuck with this wait_empty: Waiting for... state.

You mentioned having 15 topics with 50 partitions each, is the charge distributed across several Faust workers? Is the size of messages of the topic which trigger this error the same as other topics? In those kind of situation, it is sometime recommended to decrease the number of messages obtained at each poll(). On Faust, you can do that by adjusting the broker_max_poll_records parameter (see https://faust.readthedocs.io/en/latest/userguide/settings.html#broker-max-poll-records)

okeyokoro commented 5 years ago

any update on this?

okeyokoro commented 5 years ago

I can confirm that setting broker_max_poll_records=500 solved my problem

thanks @StephenSorriaux

Nozdi commented 5 years ago

I have the same issue.

APP config:

    'broker_max_poll_records': 20,
    'stream_buffer_maxsize': 10000,
    'broker_commit_every': 5000,
    'topic_partitions': 16,
    'broker_heartbeat_interval': 10,
    'broker_request_timeout': 160.0,
    'broker_session_timeout': 120.0,

I have three agents per worker with concurrency set to 1. There are 16 partitions, 2 workers. Problem occurs every rescaling from 2 to 3 workers. I'm pretty sure setting broker_max_poll_records is not enough. To run synchronous code I use run_in_executor.

Versions

Nozdi commented 5 years ago

For me setting up: stream_wait_empty=False solved this issue. https://faust.readthedocs.io/en/latest/userguide/settings.html#stream-wait-empty

I could do that as my streams are idempotent. Although I'm pretty sure it's only a workaround.

LiYunJamesPhD commented 4 years ago

Hi,

Seeing your logs, it seems like your consumer was considered dead (certainly due to not sending any heartbeats in time) and a rebalance occured and finished before it was noticed by the consumer. This led to the consumer not being able to commit its offset and being stuck with this wait_empty: Waiting for... state.

You mentioned having 15 topics with 50 partitions each, is the charge distributed across several Faust workers? Is the size of messages of the topic which trigger this error the same as other topics? In those kind of situation, it is sometime recommended to decrease the number of messages obtained at each poll(). On Faust, you can do that by adjusting the broker_max_poll_records parameter (see https://faust.readthedocs.io/en/latest/userguide/settings.html#broker-max-poll-records)

@StephenSorriaux Thank you for providing the solution. Is it possible to lose any record by decreasing the "broker_max_poll_records" parameter?

StephenSorriaux commented 4 years ago

@Li-Yun No, changing the broker_max_poll_records parameter will not change your risk to lose any record. The only risks I see are:

LiYunJamesPhD commented 4 years ago

@StephenSorriaux Thank you for your response.

LiYunJamesPhD commented 4 years ago

@StephenSorriaux in my Faust application, I got a message, which is "[INFO]: Timer commit woke up too late". Is this message considered as an error or a warning message? Thanks for reading it.