aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.08k stars 224 forks source link

[QUESTION] How to get old messages from topics? #998

Open solidguy7 opened 2 months ago

solidguy7 commented 2 months ago

Of course, I can get the messages with consumer in real time

async def test():
    consumer = AIOKafkaConsumer("test", bootstrap_servers="localhost:9092")
    await consumer.start()
    try:
        async for message in consumer:
            return message.value.decode()
    finally:
        await consumer.stop()

But what if I wanna get my old messages or find specific one from this topic?

ods commented 2 months ago

You can start consuming without group from the beginning (see seek_to_beginning method) or any other offset (e.g. see offsets_for_times and seek methods) in the past.

solidguy7 commented 2 months ago

Thanks for your reply! Actually, I`ve tried this method, but I have stuck using it. I create test code snippet like this:

import asyncio

from aiokafka import AIOKafkaConsumer, TopicPartition

async def test():
    consumer = AIOKafkaConsumer("test", bootstrap_servers="localhost:9092")
    try:
        tp = TopicPartition(topic="test", partition=0)
        consumer.assign([tp])
        await consumer.seek_to_beginning(tp)
    finally:
        await consumer.stop()

if __name__ == "__main__":
    asyncio.run(test())

Here I manually create partition, because consumer.assignment() returns an empty set(I don`t know because) And this snippet returns the following error:

raise IllegalStateError(
    aiokafka.errors.IllegalStateError: IllegalStateError: Subscription to topics, partitions and pattern are mutually exclusive

I don`t understand what I do incorrectly

ods commented 2 months ago

You don't need assignment if you don't use groups and not going to distribute messages among workers. But you need to seek for all partitions you have. Use partitions_for_topic method to get list of all available partitions.

solidguy7 commented 2 months ago

I have also tried this method. It returns None, but my test topic has been created