faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.67k stars 183 forks source link

New error messages seen in faust app with aiokafka 0.7.1 #166

Open patkivikram opened 3 years ago

patkivikram commented 3 years ago

Checklist

Steps to reproduce

Bring up the faust app and let it run for a while. After running for some time the application starts logging the following error messages


2021-06-28 10:57:33 [1] [ERROR] faust.transport.drivers.aiokafka [^----AIOKafkaConsumerThread]: Has not committed TP(topic='test_topic', partition=27) at all since worker start (started 5.52 minutes ago).

There are multiple possible explanations for this:

1) The processing of a single event in the stream
   is taking too long.

    The timeout for this is defined by the broker_commit_livelock_soft_timeout setting,
    currently set to 300.0.  If you expect the time
    required to process an event, to be greater than this then please
    increase the timeout.

## Expected behavior

These errors should only happen if the commit offset is not progressing

## Actual behavior
The app is running fine and consumer offsets are actually updated

# Versions

* Python version 3.9
* Faust version 0.6.7
* Operating system Centos75
* Kafka version 2.4.1
bluefatter commented 3 years ago

Thanks for fix this. [Party]

bitdivision commented 3 years ago

We're still seeing this issue with v0.6.9

patkivikram commented 3 years ago

@bitdivision what are you seeing? Is this on a single worker or on multiple?

bitdivision commented 3 years ago

We've now ignored all of the following logs in sentry configuration, however we are still seeing the logs for them in 0.6.9:

        "[^---AIOKafkaConsumerThread]: Has not committed",
        "[^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for",
        "[^---AIOKafkaConsumerThread]: Stream has not started processing",
        "[^---AIOKafkaConsumerThread]: Stream stopped processing, or is slow for",

We're running with 6 workers. I can give more details on specific logs if that's helpful?

Edit: to add to this, we've seen these messages for topics which are clearly being processed correctly. We would have seen lag increases if a topic stopped processing, but that's not been the case

patkivikram commented 3 years ago

can you share the logs you are seeing in 0.6.9 with more than 1 worker? It should not happen with 1 worker I hope :)

popadi commented 3 years ago

Any update on this? It still happens and it's hard to tell if it's a real issue or not. I'm using 0.6.10.

Edit: Apparently one of my two agents was publishing thousands of messages every second and it was taking all the resources away from the other agents. The error stopped appearing after I added more replicas to my app and bumped the resources a bit.

richardhundt commented 2 years ago

If you run a worker with 1 topic N partitions, it only consumes from a single partition. After 300 seconds the monitor starts complaining that N-1 partitions are idle.

I've had to set isolated_partitions=True but this can't possibly be the intended behaviour, can it?

daigotanaka commented 2 years ago

Hi all,

I am having what seems to be the same problem with faust-streaming==0.8.5.

Here is my setup:

I have a source topic with 6 partitions. As a test, I started a worker, and it's moving the offsets for Partitions 2, 3, 4, and 5. But 0 and 1 aren't processed, and they keep on lagging. I added/removed a couple of more workers. Each time a worker joins or leaves the group, a reassignment happens. No matter which worker got assigned to Partition 0 or 1, the same partitions never move their offsets.

Our Kafka cluster is on Confluent Cloud. (Don't think it matters?)

I am also wondering if there is a bad record that's causing something like an infinite loop? If that is the case, the fix is on us, of course. But I could not even do

tp = TP(<source_topic_name>, 0) 
await app.consumer.seek(tp, <some_offset>)

to skip to a certain offset.

So, just wanted to see if the problem in this issue is still alive just to eliminate the possibility that our issue is based on a Faust bug.

@richardhundt : You mentioned about isolated_partitions param. I checked the doc but could not understand what it does. Can you tell me how it helped you to fix your issue? Thanks!

Update: I just noticed my issue happens even with a single worker. This time, Partition 0 is stuck at offset=1. Other partitions are moving forward nicely.

richardhundt commented 2 years ago

@daigotanaka

As far as I can tell, the isolated_partitions parameter causes an actor to be spawned for each partition.

I expected that concurrency lets you create N actors for M partitions, including running N actors for a single partition or 1 actor for M partitions, but that doesn't seem to be how it works. The docs are kind of hand-wavy on the details, so I'm not sure if my understanding is correct.

I also found that in order to seek I needed to use an AIOKafkaConsumer directly. Something like this:

from aiokafka.consumer import AIOKafkaConsumer
from aiokafka import TopicPartition
from faust import TopicT

async def seek_topic_partition(topic: TopicT, partition: int, offset: int):
    app = topic.app
    consumer = AIOKafkaConsumer(loop=app.loop, group_id=app.conf.id)
    tp = TopicPartition(topic.get_topic_name(), partition)
    await consumer.start()
    consumer.assign([tp])
    await consumer.seek(tp, offset)
    await consumer.stop()
daigotanaka commented 2 years ago

@richardhundt Thank you for the details! I'll try AIOKafkaConsumer :)

joekohlsdorf commented 2 years ago

This was introduced in 0.6.5 when we actually started calling the verification: https://github.com/faust-streaming/faust/commit/7a45b2b4cd15b3ac8c562b6f2e0ac3368b23e4fa#diff-5704609ad5592d977f497ac5defed2c54606a1bf7e42f0677ddf88f59f47938bR278

The code doesn't care if commits go through, offsets are set in a dictionary and this is all we look at:

This probably never worked, I didn't have time to look into this in detail but my guess is that the global variable is read and updated from different threads and isn't really global. Committing to Kafka works fine, offsets advance on all partitions.

In https://github.com/faust-streaming/faust/issues/153 people also complained about a significant performance regression when this additional check was enabled.

Until we find the issue you can go back to 0.6.4 or patch this check out.

daigotanaka commented 2 years ago

A follow-up to this comment: https://github.com/faust-streaming/faust/issues/166#issuecomment-1191098178

I stopped seeing a message like "Has not committed TP(topic='test_topic', partition=27) at all since worker start (started 5.52 minutes ago)" and all the partitions started to process as expected as multiple-workers join/leave after finding the follow misconfiguration on our end:

The issue was the mismatch between the number of replicas between app config and Topic object. We were using the MyTopic class workaround for changing # of Topic replicas we adapted from here: https://github.com/faust-streaming/faust/issues/76

This was causing a mismatch between # replicas settings between the app.config and the topic. Properly aligning them via the env var TOPIC_REPLICATION_FACTOR resolved our issue.

This might be a novice mistake, but just leaving a note here anyways in case it's useful.

Thanks @richardhundt and @joekohlsdorf for providing the pointers! Reading those helped to narrow down the issue :)

Hi all,

I am having what seems to be the same problem with faust-streaming==0.8.5.

Here is my setup:

I have a source topic with 6 partitions. As a test, I started a worker, and it's moving the offsets for Partitions 2, 3, 4, and 5. But 0 and 1 aren't processed, and they keep on lagging. I added/removed a couple of more workers. Each time a worker joins or leaves the group, a reassignment happens. No matter which worker got assigned to Partition 0 or 1, the same partitions never move their offsets.

Our Kafka cluster is on Confluent Cloud. (Don't think it matters?)

I am also wondering if there is a bad record that's causing something like an infinite loop? If that is the case, the fix is on us, of course. But I could not even do

tp = TP(<source_topic_name>, 0) 
await app.consumer.seek(tp, <some_offset>)

to skip to a certain offset.

So, just wanted to see if the problem in this issue is still alive just to eliminate the possibility that our issue is based on a Faust bug.

@richardhundt : You mentioned about isolated_partitions param. I checked the doc but could not understand what it does. Can you tell me how it helped you to fix your issue? Thanks!

Update: I just noticed my issue happens even with a single worker. This time, Partition 0 is stuck at offset=1. Other partitions are moving forward nicely.

joekohlsdorf commented 2 years ago

@wbarnha Could you please explain why you closed this issue?

I don't see any recent changes to the problematic verification code I showed in https://github.com/faust-streaming/faust/issues/166#issuecomment-1192670398

I can still reproduce the problem and the solution posted by @daigotanaka does not work for me.

wbarnha commented 2 years ago

Thanks for getting back to me, I thought this was fixed by @daigotanaka but I'll go ahead and re-investigate.

patkivikram commented 2 years ago

this should be fixed with https://github.com/faust-streaming/faust/pull/380 - can you please test it @joekohlsdorf?

patkivikram commented 2 years ago

Anyone still seeing this with the latest release?

JonathanSerafini commented 2 years ago

most if not all of our faust-streaming 0.9.2 consumers are spitting out a bunch of these errors on and off ... interestingly this also includes the assignor leader topic :

[^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='findings-processor-assignor-leader', partition=0) since start (started 7.57 hours ago)

bhanuka-ilume commented 1 year ago

Still seeing the issue in faust-streaming 0.9.5

[ERROR] [^---AIOKafkaConsumerThread]: Stream has not started processing TP(topic='', partition=0) (started 18.25 hours ago).
wbarnha commented 1 year ago

Still seeing the issue in faust-streaming 0.9.5

[ERROR] [^---AIOKafkaConsumerThread]: Stream has not started processing TP(topic='', partition=0) (started 18.25 hours ago).

I've also seen this error come up while Faust is actually running normally, so it's a bit hard to troubleshoot. I think the solution lies in reviewing our aiokafka drivers to log when everything is running nominally.

alihoseiny commented 1 year ago

For the record, We are facing the exact same issue using faust-streaming 0.10.13 and Python 3.11.3.

richardhundt commented 1 year ago

Something to consider: if you have a large consumer_max_fetch_size (default is 1024 ** 2), have small messages, and your agent takes long to process, then you could see this issue.

What happens is that you fetch a chunk of 1048576 bytes in a single poll of Kafka, if your messages are 1kb on average then you'll have about 1k messages. If each message takes 1 second to process, then you end up polling Kafka once every ~15 minutes, and that'll trigger this error.

Try setting consumer_max_fetch_size to something much smaller. There are a couple of other settings you can play with. Here's my setup for feeding an OCR system with file names where the OCR backend takes several seconds per message:

app = faust.App(
    ...
    broker_heartbeat_interval=3,
    broker_session_timeout=120,
    broker_request_timeout=240,
    broker_max_poll_records=30,
    broker_max_poll_interval=120000,
    broker_commit_livelock_soft_timeout = 30 * 60,
    stream_processing_timeout = 30 * 60,
    consumer_max_fetch_size=2048,
   ...
)

Here I'm trying to increase the polling frequency by limiting max poll records and max fetch size, while increasing intervals and timeouts.

joekohlsdorf commented 1 year ago

This is true but it also happens in environments which process millions of small messages. We already know that this problem was introduced with this change. I don't know what's wrong with this validation but removing it fixes the problem (and speeds up processing significantly): https://github.com/faust-streaming/faust/commit/7a45b2b4cd15b3ac8c562b6f2e0ac3368b23e4fa#diff-5704609ad5592d977f497ac5defed2c54606a1bf7e42f0677ddf88f59f47938bR278

qlhai commented 1 year ago

I also face this error msg in my project.
faust-streaming 0.10.16 aiokafka 0.8.1 kafka-python 2.0.2 Python 3.9.15 Debian GNU/Linux 11 (bullseye)

rezblaze commented 11 months ago

We have been seeing this error for a while:

3) The agent processing the stream is hanging (waiting for network, I/O or infinite loop). 2023-12-25 23:59:55,291 process=3269118 loglevel=ERROR request_id= correlation_id= logger=faust.transport.drivers.aiokafka _log_slow_processing() L909 [^--AIOKafkaConsumerThread]: Stream has not started processing TP(topic='build_events-BuildEvent.build_id-repartition', partition=4) (started 6.51 days ago).

There are multiple possible explanations for this:

1) The processing of a single event in the stream is taking too long.

The timeout for this is defined by the stream_processing_timeout setting,
currently set to 300.0.  If you expect the time
required to process an event, to be greater than this then please
increase the timeout.

2) The stream has stopped processing events for some reason.

3) The agent processing the stream is hanging (waiting for network, I/O or infinite loop). 2023-12-25 23:59:55,291 process=3269118 loglevel=ERROR request_id= correlation_id= logger=faust.transport.drivers.aiokafka _log_slow_processing() L909 [^--AIOKafkaConsumerThread]: Stream has not started processing TP(topic='build_events-BuildEvent.build_id-repartition', partition=1) (started 6.51 days ago).

There are multiple possible explanations for this:

1) The processing of a single event in the stream is taking too long.

The timeout for this is defined by the stream_processing_timeout setting,
currently set to 300.0.  If you expect the time
required to process an event, to be greater than this then please
increase the timeout.

2) The stream has stopped processing events for some reason.

We will get 100's / 1000's of these messages during a large run.

fastapi==0.90.1 uvicorn==0.14.0 python-dateutil==2.8.2 python-dotenv==1.0.0 faust-streaming==0.10.14 starlette-exporter==0.15.1 prometheus_fastapi_instrumentator==5.11.0 schedule==1.2.1 Python 3.11.3

fonty422 commented 6 months ago

Just noting that we have the same thing, however for us it is a case where we send 7K messages on initialising the app, but thereafter may go several minutes without sending a single message. Then when one does send (after the 300s timer has elapsed) it triggers this. Is it simply triggering because it thinks there should have been a commit in the last 300s even though the topic hasn't changed the end offset?

rahmed-hartree commented 4 months ago

This is an issue when the stream is using transactions i.e. processing_guarantee is set to exactly_once.

There are two distinct methods used for committing here: https://github.com/faust-streaming/faust/blob/master/faust/transport/consumer.py#L1043-L1050

The checks @ https://github.com/faust-streaming/faust/blob/master/faust/transport/drivers/aiokafka.py#L800 are dependent on the last committed offsets to be updated @ https://github.com/faust-streaming/faust/blob/master/faust/transport/drivers/aiokafka.py#L720.

However, if you are using transactions, this method is not used when committing offsets. Instead https://github.com/faust-streaming/faust/blob/master/faust/transport/consumer.py#L324 is used where the last committed offsets are not updated.

This results in false warnings being logged as it sees no offsets being committed.

fonty422 commented 4 months ago

So we can ignore these warnings? They're not impacting business? Ideally, it would be fixed so it doesn't come up unless necessary, but in the mean time we ignore these?

rahmed-hartree commented 4 months ago

@fonty422 if you are using transactions, then yes - you can ignore these for now.

thedevd commented 4 months ago

I am facing the similar problem in my app, where all of sudden after sometime the consumer stopped.

^--AIOKafkaConsumerThread]: Has not committed TP(topic='workerInput', partition=0) (last commit 6.26 minutes ago). There are multiple possible explanations for this: 1) The processing of a single event in the stream is taking too long. The timeout for this is defined by the broker_commit_livelock_soft_timeout setting, currently set to 300.0. If you expect the time required to process an event, to be greater than this then please increase the timeout. 2) The commit handler background thread has stopped working (report as bug).

"code_file": "/usr/local/lib/python3.10/dist-packages/faust/transport/drivers/aiokafka.py", "code_line": 904, "code_func": "_log_slow_processing"

I am using

  1. python 3.10
  2. faust-streaming 0.10.13
  3. aiokafka - 0.8.1
MedAzizTousli commented 3 months ago

Same issue here with faust 0.11.2