Open bohdantan opened 4 years ago
The concurrency processes messages out of order. You wont see this behavior with concurrency=1
I was also curious about this behavior. I was expecting messages to be processed out of order, but offsets to be committed in order. The current design could clearly lead to data loss in the case of a failure and restart.
if a service crash then restart; the tp offset was not the message witch made agent crash
this is a very important issue on faust's concurrency: even if the logic for the event process is completely build with out-of-order in mind, the out-of-order commit-offset will cause loss of data when consumer process is interrupted.
Checklist
master
branch of Faust.Steps to reproduce
pip install git+https://github.com/robinhood/faust@b6ebdd955c1de5ceed81f53ade76b4b04561151e
example.py
import faust from faust.sensors.monitor import Monitor
class PrintCommitMonitor(Monitor): def on_tp_commit(self, tp_offsets): print(f'Commit offsets {tp_offsets}')
app = faust.App( 'test-app', broker='kafka://localhost:9092', value_serializer='raw', Monitor=PrintCommitMonitor, )
test_topic = app.topic('test')
@app.agent(test_topic, concurrency=2) async def process(stream): async for msg in stream: print(f'Start processing {msg} - by {stream.concurrency_index}') if msg == b'Message i=0': await asyncio.sleep(20) else: await asyncio.sleep(3) print(f'Finish processing {msg} - by {stream.concurrency_index}')
@app.task(on_leader=True) async def on_started(): print('Add messages') for i in range(10): msg = f'Message i={i}' await test_topic.send(key=msg, value=msg)