faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.6k stars 180 forks source link

Warning - producer buffer full #173

Open deadpassive opened 3 years ago

deadpassive commented 3 years ago

Steps to reproduce

We have several streaming agents, some update tables, some send to other topics. In a steady state this seems to work fine, however when ingesting peaking data (e.g. when resetting consumer offsets to 2, ingesting large amounts of data) we get a bunch of warnings (functionality seems to be fine).

Expected behavior

We don't get any warnings

Actual behavior

We get a bunch of warnings about producer buffer size.

Attempted fixes

We've managed to resolve this by: Increasing max_messages:

app.producer.buffer.max_messages = 10000

Adding await asyncio.sleep(0.1)

@app.agent(value_topic)
async def handle_new_value(stream):
    async for value in stream:
        await asyncio.sleep(0.1)

Full traceback

[2021-07-15 16:35:08,344] [283726] [WARNING] producer buffer full size 102 
[2021-07-15 16:35:22,189] [283726] [INFO] producer flush took 13.844183206558228 

Versions

novotl commented 2 years ago

Observing the same issue as you, if you're using Table for state, there is a similar discussion in this slack thread.

we use the threaded producer for the changelogs that gives better performance

which I believe refers to this

https://github.com/faust-streaming/faust/blob/d9533ca36afc2d0731095078438f748bc739d943/faust/types/settings/settings.py#L1330-L1342

You might want to give that a shot. I cannot help much further, trying to setup first faust pipeline myself.

aliosia commented 1 year ago

I have the same issue. I think its because of table updates where the producer tries to update change-log topic per message and its buffer gets full. Also I couldn't find a way for batching messages and updating the table once per batch. Take a look at this issue.

aliosia commented 1 year ago

@deadpassive I found the solution and answer it here.