faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

Auto-commit in the processing thread? #482

Open jamesellis1999 opened 1 year ago

jamesellis1999 commented 1 year ago

Checklist

Steps to reproduce

I have created a simple function which blocks for 10s for each message. I run this using the vscode debugger.

import faust
import time

app = faust.App("test", broker="localhost:9093")

topic = app.topic("new-topic", value_type=str)

@app.agent(topic)
async def processor(stream):
    async for message in stream:
        time.sleep(10)

if __name__ == "__main__":
    app.main()

Expected behavior

Commits to occur in the background thread, not the processing thread.

Documentation states auto-commit commits should occur in a background thread: https://faust-streaming.github.io/faust/userguide/streams.html#message-life-cycle Also seen in faust/transport/consumer.py:34:

      + The Consumer has a background thread that periodically commits the
        offset.

Actual behavior

I am seeing commits occur in the main processing thread.

Screenshot 2023-03-23 at 12 16 49

Blocking processor calls cause the auto-commit timer to lose its place leading to infrequent commits. In this example, a commit only occurs after 3 messages have been processed. One way to work around this issue is by wrapping the blocking function in a thread executor:

@app.agent(topic)
async def processor(stream):
    async for message in stream:
        loop = asyncio.get_event_loop()
        print("going to sleep")
        await loop.run_in_executor(None, time.sleep, 10)

If the commit happened in a different thread, like the consumer heartbeat does, this would not be an issue.

Full traceback

[2023-03-23 12:44:39,232] [65292] [INFO] Timer Monitor.sampler woke up too late, with a drift of +19.581969500000014 runtime=0.00015591600003972417 sleeptime=20.581969500000014 
[2023-03-23 12:44:39,239] [65292] [INFO] Timer Recovery.stats woke up too late, with a drift of +16.54871479099984 runtime=4.708400047093164e-05 sleeptime=21.54871479099984 
[2023-03-23 12:44:49,253] [65292] [INFO] Timer commit woke up too late, with a drift of +18.804260166999665 runtime=10.017661875000158 sleeptime=21.604260166999666 
[2023-03-23 12:44:49,255] [65292] [WARNING] Timer commit is overlapping (interval=2.8 runtime=10.017661875000158) 

Versions

DBBrowne commented 1 year ago

+1 for similar issue

In situations where processsing one message takes only 3-5 seconds, but the agent receives a sufficient burst of messages to exceed the leave_group timer, the kafka-side offset is incremented but the faust auto-commit is not triggered and faust volunatirly leaves the group. The processor/service must then be restarted to rejoin and re-commence work on the burst of messages.

wbarnha commented 1 year ago

You may be interested in these settings:

https://faust-streaming.github.io/faust/userguide/settings.html#std-setting-broker_commit_interval https://faust-streaming.github.io/faust/userguide/settings.html#std-setting-broker_heartbeat_interval

I adapted the word_count example to include your use case:

#!/usr/bin/env python
import asyncio
import time

import faust

WORDS = ['the', 'quick', 'brown', 'fox']

app = faust.App(
    'word-counts',
    broker='kafka://localhost:9092',
    store='rocksdb://',
    version=1,
    topic_partitions=8,
)
app.conf.broker_commit_interval = 15.0
app.conf.broker_heartbeat_interval = 20.0

posts_topic = app.topic('posts', value_type=str)
word_counts = app.Table('word_counts', default=int,
                        help='Keep count of words (str to int).')

@app.agent(posts_topic)
async def shuffle_words(posts):
    async for post in posts:
        for word in post.split():
            await count_words.send(key=word, value=word)

last_count = {w:0 for w in WORDS}
@app.agent(value_type=str)
async def count_words(words):
    """Count words from blog post article body."""
    async for word in words:
        word_counts[word] += 1
        last_count[word] = word_counts[word]

@app.page('/count/{word}/')
@app.table_route(table=word_counts, match_info='word')
async def get_count(web, request, word):
    return web.json({
        word: word_counts[word],
    })

@app.page('/last/{word}/')
@app.topic_route(topic=posts_topic, match_info='word')
async def get_last(web, request, word):
    return web.json({
        word: last_count,
    })

@app.task
async def sender():
    await posts_topic.maybe_declare()

    for word in WORDS:
        for _ in range(1000):
            # time.sleep(10)
            await shuffle_words.send(value=word)

    await asyncio.sleep(5.0)
    print(word_counts.as_ansitable(
        key='word',
        value='count',
        title='$$ TALLY $$',
        sort=True,
    ))

@app.on_rebalance_complete.connect
async def on_rebalance_complete(sender, **kwargs):
    print(word_counts.as_ansitable(
        key='word',
        value='count',
        title='$$ TALLY - after rebalance $$',
        sort=True,
    ))

@app.agent(posts_topic)
async def processor(stream):
    async for message in stream:
        time.sleep(10)
        print(message)

if __name__ == '__main__':
    app.main()

It definitely reduces the frequency of commit is overlapping warnings.

I should also note that I don't think a Faust stream is intended for a message to take 10 seconds to be processed though, given the default values of many intervals I've seen in faust-streaming and mode-streaming.

jamesellis1999 commented 1 year ago

There is still the issue that the documentation appears to be wrong.