faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.67k stars 183 forks source link

Consumers slowly die over time #175

Open pb376 opened 3 years ago

pb376 commented 3 years ago

Running a basic application for any extended period of time - sometimes just a few minutes - leads to consumers slowly dying one-by-one until they've all stopped responding and the entire application hangs. The data is still flowing into a topic at the tip of the pipeline from an external source, so there are still messages to be read as they begin to hang.

The following messages are displayed in increasing numbers as things begin to die:

[2021-07-21 01:04:39,262] [31912] [ERROR] [^---AIOKafkaConsumerThread]: Stream has not started processing TP(topic='archive-log-events', partition=36) (started 1.10 hour ago). 

There are multiple possible explanations for this:                                                                                                

1) The processing of a single event in the stream                                                                                    
   is taking too long.                                                                                                                    

    The timeout for this is defined by the stream_processing_timeout setting,                                                             
    currently set to 300.0.  If you expect the time                                                                                                                                               required to process an event, to be greater than this then please                                                                                            
    increase the timeout.                                                                                                                                                                     

 2) The stream has stopped processing events for some reason.                                                                       

3) The agent processing the stream is hanging (waiting for network, I/O or infinite loop).  

I've tested this with agents performing simple functions and the behavior still shows. Is there any obvious reason why this would be occurring or a reasonable way to debug it?

Checklist

Steps to reproduce

Run an application with multiple agents.

Expected behavior

Things run indefinitely.

Actual behavior

Things run and slowly begin stop.

Full traceback

N/A

Versions

mavalderrama commented 3 years ago

we have the same issue right now, there is no clear error detail on whats going on

PJ-Schulz commented 3 years ago

Have you tried to increase the broker_commit_livelock_soft_timeout or the broker_max_poll_interval?

mavalderrama commented 3 years ago

Have you tried to increase the broker_commit_livelock_soft_timeout or the broker_max_poll_interval?

thanks, I'll try it in production

taybin commented 3 years ago

@mavalderrama Did that resolve your problem?

oneryalcin commented 3 years ago

yes I noticed this behaviour too. Anybody has a solution to this?

mavalderrama commented 3 years ago

@mavalderrama Did that resolve your problem?

I'm sorry, we changed those params to big numbers without success

oneryalcin commented 3 years ago

@mavalderrama Did that resolve your problem?

I'm sorry, we changed those params to big numbers without success

Many thanks @mavalderrama for testing it. I'll try running faust in debug mode to check if I can see in logs

pikhovkin commented 2 years ago

same issue

[2022-02-17 10:08:01,225] [1] [ERROR] [^---AIOKafkaConsumerThread]: Stream stopped processing, or is slow for TP(topic='my_topic', partition=0) (last inbound 1.92 day ago). 

There are multiple possible explanations for this:

1) The processing of a single event in the stream
   is taking too long.

    The timeout for this is defined by the stream_processing_timeout setting,
    currently set to 300.0.  If you expect the time
    required to process an event, to be greater than this then please
    increase the timeout.

 2) The stream has stopped processing events for some reason.

3) The agent processing the stream is hanging (waiting for network, I/O or infinite loop). 
rafaelcapucho commented 2 years ago

I have the exactly same thing happening.

Faust: 0.8.1 Python: 3.7.11

Rydra commented 2 years ago

I'm joining in the club of having the same issue. In fact this error and the fact that agents die and do not heal up automatically is the only thing that blocks me from using this project in a production system. A pity since for a while, while running it works very well and it's very easy to set up.

In my case I'm using faust inside a docker image:

Operating system: python 3.8-slim-buster Faust version: 0.8.4 Python: 3.8

rafaelcapucho commented 2 years ago

I noticed that if you delete the topic in Kafka, recreating it, it stops producing the error. but after a while (some days) it starts again.

Note: I'm using a Kafka Broker from Confluent Cloud, their managed service.

It is super frustrating, we're moving a considerably big part of our code base to be Faust based but it doesn't look reliable during our development.

payamesfandiari commented 2 years ago

Can you guys provide a sample code so we can reproduce the same behavior? We have been using Faust extensively in production for almost two years and have never had the same issue.

elacuesta commented 2 years ago

I'm on @rafaelcapucho's team, we were able to find the cause of the issue in our case. We were not going back enough in the logs, but after doing so we saw agents crashing because of exceptions:

[2022-07-05 18:24:38,244] [1] [ERROR] [^----FaustAgent*: [.]our_agent isolated={3}]: Crashed reason=TypeError("__init__() got an unexpected keyword argument 'size'") 
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/faust/agents/agent.py", line 674, in _execute_actor
    await coro
  File "/usr/local/lib/python3.7/site-packages/faust/agents/agent.py", line 696, in _slurp
    async for value in it:
  File "/app/.../agents.py", line 415, in our_agent
    value = await do_something(...)
    ...
TypeError: __init__() got an unexpected keyword argument 'size'

After the above but before the Stream stopped processing, or is slow messages, we would see

[2022-07-05 18:28:08,128] [1] [ERROR] [^---AIOKafkaConsumerThread]: Has not committed TP(topic='topic-name', partition=3) at all since worker start (started 5.02 minutes ago).

There are multiple possible explanations for this:

1) The processing of a single event in the stream
   is taking too long.

    The timeout for this is defined by the broker_commit_livelock_soft_timeout setting,
    currently set to 300.0.  If you expect the time
    required to process an event, to be greater than this then please
    increase the timeout.

 2) The commit handler background thread has stopped working (report as bug). 

As a note, we are iterating over streams using faust.Stream.noack and manually acking events with faust.Stream.noack; I'm not entirely sure that makes a difference in this case as opposed to iterating over the stream directly.

Hopefully this helps somebody to get unstuck, cheers!

wbarnha commented 2 years ago

I've also been encountering similar errors. I think the first step out of this mess is to give agents better logging capabilities. @elacuesta would you be allowed to show us more of your code for example? I'm having trouble connecting the dots for this segment:

  File "/app/.../agents.py", line 415, in our_agent
    value = await do_something(...)
    ...
TypeError: __init__() got an unexpected keyword argument 'size'
elacuesta commented 2 years ago

Sorry, I don't have those logs anymore. Was I posted was an anonymized version of an actual agent we have, basically to show that there was an exception somewhere down the call chain. Specifically, the agent our_agent was invoking the do_something coroutine, which was in turn invoking other coroutines, so on and so forth, until some code tried to create an object passing an invalid keyword argument (TypeError: __init__() got an unexpected keyword argument 'size'). The missing part of the traceback is not relevant, it's just the call chain that led to the exception from the initial invocation in the agent.

pb376 commented 2 years ago

I decided to give faust another try today, a year after submitting this issue and giving up shortly afterwards, but pretty much immediately hit this same problem again. There were exceptions being thrown due to a bug around topics.py:465 (FutureMessage has no channel attribute) but after fixing that, and ensuring that there were absolutely no other exceptions being thrown, the consumers still continued to slowly die until the application ground to a halt. This application has essentially no overlap in (our) code, configuration, event patterns, purpose, etc, with the one from last year that also showed this issue.

I'm curious to learn about cases where faust is being successfully used in production for a non-trivial workload, and how the configurations and pipeline architecture being used differ from ours. I have to imagine this issue is affecting more than just a small number of users, given the steady flow of people popping into this thread and blog posts like this, so I hope this issue (and similar) make it to the top of the priority list for those working on/comfortable with the codebase.

If anyone can provide a non-trivial example project that doesn't show this behavior, I can run it in our environment and see how it fares, which may hopefully help start narrowing things down a bit.

wbarnha commented 2 years ago

(FutureMessage has no channel attribute)

I just took a look in topics.py and I see immediately what you're talking about. I'll make a patch for that ASAP in https://github.com/faust-streaming/faust/pull/342.

I'm curious to learn about cases where faust is being successfully used in production for a non-trivial workload

I've had success using Faust in production for synchronizing information across multiple applications, not sure how detailed I'm allowed to describe our use-case though.

Taken from your link provided: https://kapernikov.com/a-comparison-of-stream-processing-frameworks

Faust has support for creating offline unit tests (so you could test and debug a stream processor without having a Kafka broker at your disposal).

We really need to make our unit tests more readable. So many existing issues in this project come from the lack of tests when a new feature is added (which I'm guilty of).

batman616 commented 1 year ago

excuse me! some one please let me know this issue been fixed yet? my consumer working with 10% my message on kafka topic and stop, ikd what to do. please help me

[2022-12-07 23:38:44,782] [74736] [WARNING] [^--Consumer]: wait_empty: Waiting for tasks [(1, <ConsumerMessage: TopicPartition(topic='complete_collect', partition=2) offset=0>), (1, <ConsumerMessage: TopicPartition(topic='complete_collect', partition=3) offset=0>)] 
[2022-12-07 23:38:44,790] [74736] [INFO] [^--Consumer]: Agent tracebacks:
=======================================
 TRACEBACK OF ALL RUNNING AGENT ACTORS
=======================================
Rydra commented 1 year ago

@batman616 in the end, this problem of the consumers ended up being a showstopper for me, so I moved on from faust to try something else. In my case, I've been using aiokafka in a production system and I'm pleased so far. No consumers dying, and a simpler solution.

patkivikram commented 1 year ago

wait_empty: Waiting for tasks [(1, <ConsumerMessage: TopicPartition(topic='complete_collect', partition=2) offset=0>), (1, <ConsumerMessage: TopicPartition(topic='complete_collect', partition=3) offset=0>)]

If your consumers are idempotent set stream_wait_empty=False in your app and you should not see any issues on rebalance

https://faust-streaming.github.io/faust/userguide/settings.html#stream-wait-empty

patkivikram commented 1 year ago

aiokafka assumes your consumers are idempotent and does not wait until the consumer processes all the messages it fetches which is what you will achieve with stream_wait_empty

thomas-chauvet commented 1 year ago

I had the same issue, I resolved it for now by scaling my faust worker. Faust is running on k8s and I spawned two pods instead of one. It seems to hold like this.

My issue appears when lag was very high on different topics. The load was not evenly distributed on the different partition and it may be the cause of the issue.

The pod scaling seems to do the trick for now.

maparent commented 1 year ago

Same issue. Having multiple workers does not seem to help. (Though the second worker being killed restarted the first?) My Kafka (kafka_2.12-3.3.1) is using Kraft, fwiw. This is a simple case: The topic that fails has a single partition and a single worker. What is more interesting: the consumer seems to stop spontaneously after consuming exactly 4097 messages, i.e. 4K+1. I hope that may help somebody find the cause?

maparent commented 1 year ago

Hmmm... maybe my problem is different. I guess I was hitting stream_buffer_maxsize. I was initially looping over stream.events(), which seems to disable auto-ack, or maybe something else; sorry for the noise.

However, I changed it to a more standard iteration over the stream itself, same issue. (Or rather it still stops after 4K messages, but I'm not sure it's for the same reason: see below.)

Then, I tried setting broker_commit_every=2000 to get faster acks, and now I'm told my events are not acknowledged at all. (And if I interrupt the worker, it refuses to quit and gives an error message ending with "The commit handler background thread has stopped working"... or my messages are taking too long (they're not.) Also, the same messages now come every time, consistent with events not being acknowledged. (That is new.)

daniel-martinalbo commented 1 year ago

I'm having the same issue as @maparent. After 4097 messages, it stops. Were you able to solve the problem?

maparent commented 1 year ago

I'm afraid I also gave up and went with aiokafka... Sorry.

cristianmatache commented 1 year ago

TL;DR

I believe some of these errors are false alarms.

Long-winded comment

There are a bunch of issues that lead to this behavior. I will put in my two cents on this topic. I believe the logic in the verify_event_path (aiokafka.py) function is a bit off. There are race conditions that become apparent when dealing with topic partitions with very low throughput.

In aiokafka.py there is a Consumer that runs several of its methods in a background thread. Moreover, most Faust code is written with asyncio. In consequence, different states in the system are populated asynchronously (e.g., stream_inbound_time - the time of the latest message received from a topic partition, and monitor.tp_last_comitted_at - the time we last committed an offset for a certain topic partition). Because this is a Consumer, it runs several performance checks periodically on all assigned topic partitions.

https://github.com/faust-streaming/faust/blob/da90c7cdd08487f480cc2603716c4bedb0ffd7c9/faust/transport/consumer.py#L896-L909

Log messages as the one posted by the OP (@pb376) are logged from here https://github.com/faust-streaming/faust/blob/da90c7cdd08487f480cc2603716c4bedb0ffd7c9/faust/transport/drivers/aiokafka.py#L744

SLOW_PROCESSING_STREAM_IDLE_START

Let's start with SLOW_PROCESSING_STREAM_IDLE_START (which is the OP's problem). Inside verify_event_path the logic behind this warning is:

https://github.com/faust-streaming/faust/blob/da90c7cdd08487f480cc2603716c4bedb0ffd7c9/faust/transport/drivers/aiokafka.py#L757-L763

https://github.com/faust-streaming/faust/blob/da90c7cdd08487f480cc2603716c4bedb0ffd7c9/faust/transport/drivers/aiokafka.py#L774-L787

Here we will constantly be getting the high watermark (i.e., the latest available offset) and the last committed offset every time verify_event_path runs. For a low-throughput topic partition that has not ticked since the start of the faust app these will be equal (assuming everything is resuming from a non-corrupted state). Let's assume further there has been no event on the low-throughput topic for longer than self.tp_stream_timeout_secs after the app started.

As long as highwater == committed_offset no warning is produced. Let's see how the warning is produced. Assume an event is produced on that topic partition. The next time verify_event_path runs it will find that the high watermark (highwater, i.e. the latest offset on that partition) changed because a new message has been produced (hence highwater > commited_offset), but our app might have not yet consumed it so there is no entry in monitor.stream_inbound_time for the low-throughput topic partition that just got updated. Thus, the warning is produced even though everything is fine.

Note that verify_event_path and monitor.on_stream_event_in (the method that populates monitor.stream_inbound_time) are running concurrently via asyncio in the same thread:

https://github.com/faust-streaming/faust/blob/da90c7cdd08487f480cc2603716c4bedb0ffd7c9/faust/sensors/monitor.py#L453-L458

which is called from

https://github.com/faust-streaming/faust/blob/da90c7cdd08487f480cc2603716c4bedb0ffd7c9/faust/sensors/base.py#L181-L188

which is called from

https://github.com/faust-streaming/faust/blob/da90c7cdd08487f480cc2603716c4bedb0ffd7c9/faust/streams.py#L1190

where on_stream_event_in is the method of the app's sensors which is a SensorDelegate.

https://github.com/faust-streaming/faust/blob/da90c7cdd08487f480cc2603716c4bedb0ffd7c9/faust/streams.py#L157

SLOW_PROCESSING_NO_COMMIT_SINCE_START

Let's turn our attention to the SLOW_PROCESSING_NO_COMMIT_SINCE_START and put it into the perspective of a very low throughput topic. This is the logic that triggers this warning, in the same verify_event_path method.

https://github.com/faust-streaming/faust/blob/da90c7cdd08487f480cc2603716c4bedb0ffd7c9/faust/transport/drivers/aiokafka.py#L802-L813

self.tp_last_committed_at is populated in the _commit method inside the same class AIOKafkaConsumerThread. However, the verify_event_path and _commit methods run concurrently in different threads. _commit is called with call_thread (since AIOKafkaConsumerThread is a ServiceThread) so it schedules methods for execution on a MethodQueue in the separate thread.

https://github.com/faust-streaming/faust/blob/da90c7cdd08487f480cc2603716c4bedb0ffd7c9/faust/transport/drivers/aiokafka.py#L707-L723

As before, let's assume the low-throughput topic partition hasn't had any new messages for longer than self.tp_commit_timeout_secs seconds since the app was started. All this time self.tp_last_committed_at will not have an entry for this topic partition. Naturally, we don't commit the offset of something that we haven't even received. Once received, we concurrently handle the message, verify_event_path, and commit the offset in a separate thread.

Again, as soon as a message is produced highwater > committed_offset, the verify_event_path check proceeds but the thread that commits may have not yet committed the offset. So, when verify_event_path runs and finds no committed offset for that topic partition for longer than self.tp_commit_timeout_secs since the start and logs the warning.

Experiment to reproduce these issues

Include the thread id in log messages. Add logging in:

And add logging in the places where these variables are mutated.

Create a Faust app that reads from one topic. Send a single message to the topic after the configurable timeouts (i.e., self.tp_commit_timeout_secs and self.tp_stream_timeout_secs) have passed.

@wbarnha please correct me if I'm wrong.

mAtthEwwww commented 7 months ago

After setting the environment variables like this, the problem seems to have been resolved

export BROKER_MAX_POLL_INTERVAL=3000
export BROKER_MAX_POLL_RECORDS=50
export BROKER_REBALANCE_TIMEOUT=600
export BROKER_REQUEST_TIMEOUT=900
export BROKER_SESSION_TIMEOUT=600
export BROKER_HEARTBEAT_INTERVAL=30
export STREAM_PROCESSING_TIMEOUT=3000
export BROKER_COMMIT_LIVELOCK_SOFT_TIMEOUT=3000
Luferov commented 6 months ago

Similar problem and it's very sad

MedAzizTousli commented 3 months ago

Same problem here with faust version 0.11.2

pawel-swiecki-saucelabs commented 1 month ago

0.11.3 and nothing changed

Luferov commented 1 month ago

We'll change Faust to faststream 🤷

MedAzizTousli commented 1 week ago

My updates regarding this issue:

  1. I solved my issue a month ago by reducing the complexity of the topic listener. I was receiving around 3000 messages each minute in a span of 5-10 seconds, and I had an O(100) of processing for each new message. I turned my O(100) to an O(5).
  2. Yesterday, I encountered a new bizarre issue. Running faust worker locally works completely fine, but running it in a Docker image makes the Faust receiver lags.
cristianmatache commented 1 week ago

Another potential reason for this happening is that faust must flush its outgoing buffers to commit. Remember that faust guarantees processing at least once. https://github.com/faust-streaming/faust/blob/ff75c0be3d784d28b9f69ba3ea94be769b151b89/faust/types/enums.py#L4-L6

If you use faust to read from one topic and push to another, faust has to flush its producer buffers before committing. aiokafka is not very fast at flushing buffers (especially when compared to librdkafka). In extreme cases, flushing can take longer than your max poll interval and/or session timeout. One way to go around that is to monkey-patch faust's producer

from confluent_kafka import Producer

producer = Producer(...)

async def _patched_send_pending(fut: FutureMessage) -> None:
    producer.produce(
         fut.message.channel.get_topic_name(),
         value=fut.message.value,
         key=fut.message.key,
         partition=fut.message.partition,
    )
    producer.poll(0)

async def _patched_flush(*args) -> None:
    producer.flush()

app.producer.buffer._send_pending = _patched_send_pending
app.producer.buffer.flush = _patched_flush
LunaTiy commented 5 days ago

Found this problem too. Out team have Kafka consumer service with simple logic (just read and process messages). But after 5 minutes working, process failed in pods (we have 8 pods in k8s for this app)

We spent a long time looking for the problem, since there is 1 service that works perfectly with Faust, and another service crashed with this error

As a result, after many hours of debugging and searching, we realized that an error occurs in the Faust deserializer

If you use Faust deserializer (TopicT), check it. Faust don't rise errors into desrializing process

Example:

order_topic = app.topic(
    SMKafkaTopic.orders,
    key_serializer=KeyDeserializer(),
    value_serializer=OrdersDeserializer(),  < -- problem was here (
)
from google.protobuf.json_format import MessageToDict

from src.base.deserializing.base import AbstractCodecDeserializer
from src.integrations.orders.proto import orders_pb2
from src.integrations.orders.schemas.incoming.order import Order

class OrdersDeserializer(AbstractCodecDeserializer):
    """Orders messages deserializer from kafka.""""

    def loads(self, message: bytes) -> Order:
        try:
            order = orders_pb2.Order()
            order.ParseFromString(message)

            order_dict = MessageToDict(message=order, preserving_proto_field_name=True, use_integers_for_enums=True)
            return Order(**order_dict)
        except BaseException as e:
            print(e) < -- you could see error here

Ideally, make a decorator for the deserialization method to catch such errors and log them

Versions

Python version: 3.11.7

faust-streaming = "^0.11.3" faust-prometheus-exporter = "^0.1.9"

Image: python slim 3.11 App deploying into k8s