aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.09k stars 224 forks source link

Wrong last consumed offset when using transaction #911

Open vmaurin opened 11 months ago

vmaurin commented 11 months ago

Describe the bug

Here what I posted on the kafka user mailing list

I am working on an exactly once stream processors in Python, using aiokafka client library. My program stores a state in memory, that is recovered from a changelog topic, like in kafka streams. On each processing loop, I am consuming messages, producing messages to an output topics and to my changelog topic, within a transaction. When I need to restart a runner, to restore the state in memory, I have a routine consuming the changelog topic from the beginning to the "end" with a read_commited isolation level. Here I am struggling to define when to stop my recovery :

  • my current (maybe) working solution is to loop over "poll" until poll is not returning any messages anymore
  • I tried to do more something based on the end offests, the checking the consumer position, but with control messages at the end of the partition, I am running into an issue where position is one below end offsets, and doesn't go further

and the response I got

Sounds like a bug in aiokafka library to me. If the last message in a topic partition is a tx-marker, the consumer should step over it, and report the correct position after the marker. The official KafkaConsumer (ie, the Java one), does the exact same thing.

Expected behaviour

So as mentioned by the response I got, when a consumer receive a control message, it should "step over" in term of offset consumed. To be honest, I am not 100% sure that the behavior of the java KafkaConsumer, as kafka internal tool (like kafka-consumer-groups.sh) that reports the lag, also show an offset of 1 or 2

Environment (please complete the following information):

Reproducible example

Not a reproducable example, but here my recovery code

consumer = AIOKafkaConsumer(
    bootstrap_servers=self._bootstrap_servers,
    isolation_level="read_committed",
    enable_auto_commit=False,
)
offset = -1
await consumer.start()
try:
    tp = TopicPartition(topic=self._changelog_topic, partition=state_partition.partition)
    consumer.assign([tp])
    await consumer.seek_to_beginning(tp)
    recovering = True
    while recovering:
        recovering = False
        msg_batch = await consumer.getmany(timeout_ms=1000, max_records=500)
        processing_listener.check_for_cancellation()
        for _, msgs in msg_batch.items():
            recovering = True
            await state_partition.restore(msgs)
            offset = msgs[-1].offset

and instead of while recovering when I am trying to do something like offset < end_offset (or end_offset -1, I never remember the logic here), it doesn't work when the latest messages are control ones

vmaurin commented 8 months ago

Could it be related to the fact of doing manual commit ? i.e no "real" message are consumed, so the consumer loop doesn't trigger and it is not committing offsets ? Any idea how a consuming loop could trigger a commit on transactional messages ?

y4n9squared commented 6 months ago

We're also seeing this issue. It feels like the library should support iterating over the end of transaction record. Both of the kafka-python and confluent_kafka consumer implementations allow the consumer to reach the HWM even when the last record is a control record.