dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.62k stars 1.41k forks source link

KafkaProducer.flush does not throw the timeout exception mentioned in the API doc #1347

Open bgedik opened 6 years ago

bgedik commented 6 years ago

I am getting the following warning from the flush() method of KafkaProducer, yet the API documentation says it will throw an exception.

kafka.producer.record_accumulator|await_flush_completion|WARNING|KafkaTimeoutError: Batch for TopicPartition(topic='unscrambl_test_topic', partition=0) containing 1 record(s) expired: 30 seconds have passed since last append

dpkp commented 6 years ago

flush() raises an exception only when it cannot complete all pending futures. But completion here also includes assigning an exception to the future, which can itself be a KafkaTimeoutError. This simply means that the send timed out internally before hitting your flush timeout. I hope that makes sense!

bgedik commented 6 years ago

I understand that, but in this case the timeout was violated. The future took more than the timeout given to the flush() to fail. I'll double check that and get back to you. I still think there is a problem.

gmaz42 commented 4 years ago

Any update on this? Without an exception it basically means I have to switch to synchronous sending, or parse the logs to actually know how many messages were not delivered.

Both ugly workarounds.