aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.08k stars 224 forks source link

seek_to_committed does not work when committed offset is 0 #1003

Open t7wakim opened 2 months ago

t7wakim commented 2 months ago

Hi,

The seek to committed function has a small bug. It does not work when the last committed offset of a TopicPartition is 0 (i.e., nothing was ever committed). This happens if you getmany() or getone() message from a never-committed-to TopicPartition with auto commit disabled. Lets say the messages couldn't be processed and you want to seek_to_committed which is 0 in this case, it doesn't work.

Snippet of code from the seek_to_committed method:

            if offset and offset > 0:
                self._fetcher.seek_to(tp, offset)
        return committed_offsets

I believe the if statement above should be if offset and offset >= 0: which is valid.

To work around this, I had to do the following, which works as expected:

                        current_committed_offset = await consumer.committed(tp)
                        consumer.seek(tp, current_committed_offset)

Thanks