robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

Offset not committed after partition returned back to worker after 2 rebalances #681

Open bohdantan opened 3 years ago

bohdantan commented 3 years ago

Checklist

Steps to reproduce

import faust from faust.sensors.monitor import Monitor

class PrintCommitMonitor(Monitor): def on_tp_commit(self, tp_offsets): print(f'Commit offsets {tp_offsets}')

app = faust.App( 'test-app', broker='kafka://localhost:9092', value_serializer='raw', Monitor=PrintCommitMonitor, )

test_topic = app.topic('test')

@app.agent(test_topic) async def process(stream): async for msg in stream: print(f'Start processing {msg}') await asyncio.sleep(10) print(f'Finish processing {msg}')

@app.task(on_leader=True) async def on_started(): print('Add messages') for i in range(10): for p in range(2): msg = f'Message partition={p} i={i}' await test_topic.send(key=msg, value=msg, partition=p)



- Start worker `faust -A example worker -l info --without-web`. Task will write 10 messages to both partitions. Then agent will start processing messages from both partitions
- Wait until offset will be committed for both partitions
- Start second instance of worker `faust -A example worker -l info --without-web`. Rebalance will occur and one of partitions (assume partition 1) will be assigned to second worker.
- Wait until second worker commits offset and press Ctrl+C to stop it.
- Rebalance will occur and partition 1 will be returned back to first worker.
- First worker will process messages from both partitions but it will not commit offset for partition 1.

## Expected behavior

First worker should commit offset for both partitions.

## Actual behavior

First worker does not commit offset for partition 1. That happens because acked state (`self._acked` and others) are not cleared in `faust.transport.consumer.Consumer` when setting new partitions after rebalance. If we have following sequence:
- worker had partition, processed messages, committed offset e.g. 5
- worker lost partition
- some other worker instance got that partition and processed messages and committed offset e.g. 10
- partition was returned back to first worker instance

then worker will have dirty acked state, will start processing messages from 10, and will never commit offset for that partition because messages 5-10 will be never added to acked state.

Seems we need to clear acked state when setting new partitions.

# Versions

* Python version - 3.8.6
* Faust version - git+https://github.com/robinhood/faust@b6ebdd955c1de5ceed81f53ade76b4b04561151e
* Operating system - Ubuntu 20.04.1
* Kafka version - 2.4.1