aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.13k stars 228 forks source link

[QUESTION] How to deal with the background rebalance process #929

Closed vmaurin closed 11 months ago

vmaurin commented 11 months ago

Hi !

In my company, we are using quite a lot aiokafka : for basic producer/consumer loop, but we also built our own streaming lib on top of it (with statefull transformer).

We stumble quite often on issue related to have the rebalance logic done in a background coroutine, like it is explained here https://aiokafka.readthedocs.io/en/stable/kafka-python_difference.html#rebalances-are-happening-in-the-background

Is has a lot consequences, and makes some classic patterns wrong.

The simplest example is the "at least once" consumer loop

The pattern we did, like in a java client was, like in the example https://aiokafka.readthedocs.io/en/stable/examples/manual_commit.html :

while True:
    records = consumer.getmany(...)
    await some_processing(records)
    consumer.commit()

But this kind of loop will fail if the background group rebalance routine is executing during the processing. When reaching the commit, you will get a CommitFailedError. If not catched, the worker will die, then a new rebalance will be triggering, causing again some rebalance, and so on. We are also under the impression that the "commit" without argument could be risky as subscription could have change in between

We have end up implementing the above pattern like this

while True:
    records = consumer.getmany(...)
    offsets = {}
    for tp, messages in records.items():
        offsets[tp] = messages[-1].offset + 1
    await some_processing(records)
    try:
        consumer.commit(offsets)
    except (CommitFailedError, IllegalStateError) as err:
        logger.warning("Commit failed due to rebalancing, circle back to consume new messages",  exc_info=err)

Is this pattern correct ? Then should the example in the documentation highlights this behavior ? Should aiokafka provides helpers for such pattern ? max_poll_interval_ms documentation and related error messages are also wrong and confusing (like the above will suggest to change max_poll_interval_ms but it won't help, as it is happening when the processing time is below max_poll_interval_ms too)

In the past, we also tried to mutually exclude the rebalance process from the consumer loop with locks, but then it is blocking also the heartbeat and makes it complicated to maintain a consumer alive.