morganstanley / modern-cpp-kafka

A C++ API for Kafka clients (i.e. KafkaProducer, KafkaConsumer, AdminClient)
Apache License 2.0
331 stars 86 forks source link

Consumer fell into an endless loop before it aborted in example_KafkaConsumer_Simple #226

Open DittoWii opened 6 months ago

DittoWii commented 6 months ago

I am running a simple example using _example_KafkaConsumer_Simple and example_KafkaProducer_Simple, the latter works perfectly, but it seems that the consumer can never subscribe correctly.
In function KafkaConsumer::subscribe in KafkaConsumer.h, I'm confused about the value of the variable _pendingEvent. So I use dbg, and found that the value of !_pendingEvent in line503 seems uncorrect, so KAFKA_API_DO_LOG(Log::Level::Notice, "subscribed, topics[%s]", topicsStr.c_str()); will never be executed, and the function KafkaConsumer::subscribe will never return, causing core dumped evetually. So is this my problem or .... Looking forward to your reply.

DittoWii commented 6 months ago

and I found that sometimes it works, but sometimes not, am I using it wrong?

c4pQ commented 5 months ago

I believe you have your application crashing with core dump because of uncaught kafka::Exception.

As for _pendingEvent, from code I see:

  1. initially you have it set here - https://github.com/morganstanley/modern-cpp-kafka/blob/main/include/kafka/KafkaConsumer.h#L484
  2. then it's polling
  3. then you either have: 3.1. rebalance callback called and having event.reset() - https://github.com/morganstanley/modern-cpp-kafka/blob/main/include/kafka/KafkaConsumer.h#L920 3.2. or nothing happened during poll and you continue with exception

What do you use for test purposes? Kafka container? Something in cloud? Mock cluster from librdkafka?