KakfaConsumer: by polling the rd_kafka_queue_get_main queue instead of the rd_kafka_queue_get_consumer queue,
the timer for max.poll.interval.ms did not get reset which eventually
resulted in a timeout despite polling. (See librdkafka
documentation)
Modifications:
RDKafkaClient:
rename mainQueue to queue
use rd_kafka_queue_get_consumer instead of
rd_kafka_queue_get_main for KakfaConsumer clients
-> this will reset the timer for max.poll.interval.ms so that
the consumer does not time out despite polling
invoke rd_kafka_queue_destroy(self.queue) on
RDKafkaClient.deinit to loose reference to queue
Motivation:
This PR fixes issue #110.
KakfaConsumer
: by polling therd_kafka_queue_get_main
queue instead of therd_kafka_queue_get_consumer
queue, the timer formax.poll.interval.ms
did not get reset which eventually resulted in a timeout despite polling. (Seelibrdkafka
documentation)Modifications:
RDKafkaClient
:mainQueue
toqueue
rd_kafka_queue_get_consumer
instead ofrd_kafka_queue_get_main
forKakfaConsumer
clients -> this will reset the timer formax.poll.interval.ms
so that the consumer does not time out despite pollingrd_kafka_queue_destroy(self.queue)
onRDKafkaClient.deinit
to loose reference to queue