robinhood / faust

Python Stream Processing
Other
6.74k stars 533 forks source link

Last message in kafka is not getting processed with faust take() method #656

Open sivasai-quartic opened 4 years ago

sivasai-quartic commented 4 years ago

As there is already a ticket available on a similar issue regarding offset lag is always 1 even after processing the last record, but this is a different issue where the last message is not getting processed.

i"m using faust 1.10.4

Steps to reproduce

add some 10 messages with one partition in kafka  and try reading with below code:

@app.agent(input_topic, concurrency=1)
async def my_task(tasks):
    async for my_task in tasks.take(record_per_partition, within=poll_interval):
        assert len(my_task) > 0
        asyncio.gather(*(process_payload(json.loads(args.decode('utf-8'))) for args in my_task))

the last message is not getting processed with faust take() method, it's happening only if I use take method (it's not happening with stream.events() or any other method)

Expected behavior

It should process all the records available in kafka\

Versions

sivasai-quartic commented 4 years ago

@ask can you please confirm that it is bug or I'm missing some thing ..