robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

`faust.command`s seem to need to have their `topic.send` operations flushed with an `await asyncio.sleep(...)` #172

Closed cpdean closed 5 years ago

cpdean commented 5 years ago

Checklist

faust==1.1.3

Steps to reproduce

Following the withdrawals example I've built an application that has a cli command for pulling entries from a database and sending them as faust.Records to a topic that a worker is consuming from. I noticed that if my faust.command does not use an await asyncio.sleep(...) at the end, the last few messages do not appear to be sent to kafka. This is confirmed both by having the faust worker print to the screen, as well as using kafka's kafka-console-consumer listening to the topic.

This is a single node kafka cluster set up in docker-compose following similar instructions to this documentation

Here is the producer that does not write the last one-to-three messages before the python cli process ends:

@app.command(
    faust.cli.option('--start'),
    faust.cli.option('--end'),
    faust.cli.option('--count'),
)
async def load_records(self, start: str, end: str, count: str):
    """
    load records from legacy database
    """
    for r_id, dw_changed_date, data in grab_records(start, end, count):
        r = Record(
            r_id=r_id,
            dw_changed_date=dw_changed_date,
            data=data,
        )
        await records.send(key=r.r_id, value=r)

After adding a 1 second sleep, both the faust worker and the kafka-console-consumer ack the message:

@app.command(
    faust.cli.option('--start'),
    faust.cli.option('--end'),
    faust.cli.option('--count'),
)
async def load_records(self, start: str, end: str, count: str):
    """
    load records from legacy database
    """
    for r_id, dw_changed_date, data in grab_records(start, end, count):
        r = Record(
            r_id=r_id,
            dw_changed_date=dw_changed_date,
            data=data,
        )
        await records.send(key=r.r_id, value=r)
    await asyncio.sleep(1)

Expected behavior

When writing naiive "send this to a topic" code, I expect all messages from the generator I am consuming to be visible on the kafka topic.

Actual behavior

The last few (1 - 3) messages of the generator are not sent to the topic.

I'm not sure if this really a faust bug or an aiokafka bug or a "i don't understand async / await paradigms fully" bug. This is a behavior that makes it look like messages could be lost.

As a user of faust should I follow up every await topic_name.send(...) with a await asyncio.sleep(0.01) to make sure it is flushed?

Versions

ask commented 5 years ago

Before Faust 1.1.3 the producer was configured to do batching, but by accident didn't actually do so because we were waiting for each message to be sent.

I'm guessing in the "producer command" example these messages are lost simply because the producer is not fully stopped before the process exits.

Somewhat surprising, the default behavior in Python is to shut down immediately after a signal, and when using asyncio finally blocks are not executed at all, even when getting a KeyboardInterrupt exception:

import asyncio
import time
async def main():
    try:
        await asyncio.sleep(10.0)
    finally:
        print('DONE!')
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

If you hit Ctrl-c before this sleep ends, you will never get a "DONE" message emitted.

In 1.2 I have moved all the faust commands (including user commands) to be executed by mode.worker, this way they will have the same environment as the worker command. Additionally, we should it so the app.producer() if started, is always stopped before the command process exits.

cpdean commented 5 years ago

Interesting.

Thanks!