aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.13k stars 228 forks source link

consumer_timeout ignored? #132

Open ZmeiGorynych opened 7 years ago

ZmeiGorynych commented 7 years ago

Hi,

when I run aiokafka consumer with consumer_timeout=100, it should terminate 100ms after reaching the end of the topic; instead (as demonstrated in the attached notebook) it hangs on, presumably waiting for the next record to show up, until terminated from keyboard.

Is it a bug or am I doing something wrong?

Thanks a lot! consumer_timeout ignored.zip

tvoinarovskyi commented 7 years ago

Hmm, this seems like a misunderstanding on the initial port from kafka-python (I do presume the described behaviour comes from that). This option is not part of Java's API, so we probably just ended up with a parameter with the same name. Our docs don't state any behaviour like you described:

consumer_timeout_ms (int) – maximum wait timeout for background fetching routine. 
Mostly defines how fast the system will see rebalance and request new data for new 
partitions. Default: 200

As for your case, something like this should work:

src = AIOKafkaConsumer('my_topic',loop=loop, bootstrap_servers=kafka_host,
            group_id=None, consumer_timeout_ms=100, 
            auto_offset_reset='earliest')

loop.run_until_complete(src.start()) 

async def consume_task():
    try:
        async for msg in src:
            msgprint(msg)
    except KafkaError as err:
        print("error while consuming message: ", err)

async def consumer_task_timeout():
    try:
        await asyncio.wait_for(consume_task(), timeout=1.5)
    except asyncio.TimeoutError:
        print("Task timed out")

loop.run_until_complete(consumer_task_timeout())

P.S. And please add explicit code blocks so other people will also understand what you're asking (without installing ipython).

ZmeiGorynych commented 7 years ago

Thanks a lot, that clarifies it!

ZmeiGorynych commented 7 years ago

What I really wanted was to read Kafka topics of unknown but finite length. So as long as new records are being pulled, I'm happy to wait indefinitely, but when no more new records are forthcoming, I want to stop; so a fixed-length timeout on the couroutine won't help here.

This could have been achieved by a timeout on an individual record poll (as in the kafka-python parameter that started this thread), but as apparently there is no such thing in aiokafka, what I now do instead is to compare current record offset to high watermark on that partition, using the highwater() function.

tvoinarovskyi commented 7 years ago

@ZmeiGorynych I think your approach is the best way to do it. It will be dictated by the real highwater mark of the broker, not anything that was defined in client code.

ZmeiGorynych commented 7 years ago

And an even better way of doing it is using the getmany() function, with its argument timeout_ms which does exactly what I want!

A problem with my earlier approach was that highwater() is only available after the first fetch, so my earlier method couldn't be used for empty topics.

tvoinarovskyi commented 7 years ago

Can you describe you're usecase in details. It may be good to include an API to do exactly that.

ZmeiGorynych commented 7 years ago

Consuming realtime data from several Kafka topics, in no particular order; and because of assumptions I have about the data, if there's no data from a given topic for n seconds, I want to send a heartbeat instead and keep listening - so getmany is perfect, if it returns None after timeout_ms I know I need to send a heartbeat instead.