mfontanini / cppkafka

Modern C++ Apache Kafka client library (wrapper for librdkafka)
BSD 2-Clause "Simplified" License
599 stars 207 forks source link

rdkafka breaks on exceptions from inside callbacks #162

Open abyss7 opened 5 years ago

abyss7 commented 5 years ago

The most typical problem is to create consumer and then close it. It leads to this backtrace:

#0  cppkafka::Consumer::handle_rebalance (this=0x7ffefc002910, error=RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, topic_partitions=...) at ../../contrib/cppkafka/src/consumer.cpp:303
#1  0x00007fffdf3dd75b in cppkafka::Consumer::rebalance_proxy (error=RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, partitions=0x7fff040038e0, opaque=0x7ffefc002910) at ../../contrib/cppkafka/src/consumer.cpp:55
#2  0x00007fffdf0937d6 in rd_kafka_poll_cb (rk=0x7ffefc003270, rkq=0x7fff24205e20, rko=0x7fff04003780, cb_type=RD_KAFKA_Q_CB_RETURN, opaque=0x0) at ../../contrib/librdkafka/src/rdkafka.c:3129
#3  0x00007fffdf095f3c in rd_kafka_consumer_close (rk=0x7ffefc003270) at ../../contrib/librdkafka/src/rdkafka.c:2660
#4  0x00007fffdf3de6a7 in cppkafka::Consumer::close (this=0x7ffefc002910) at ../../contrib/cppkafka/src/consumer.cpp:276
#5  0x00007fffdf3dde08 in cppkafka::Consumer::~Consumer (this=0x7ffefc002910) at ../../contrib/cppkafka/src/consumer.cpp:82

Where the next call to assign() returns:

(gdb) info locals
error = RD_KAFKA_RESP_ERR__DESTROY

Which leads to the thrown exception and Kafka hangs somewhere inside itself. I was unable to debug the root cause.

The exception handler copy-pasted from destructor into handle_rebalance() solves the symptoms.

accelerated commented 5 years ago

From what i can see in the code, before close() is called, the consumer callbacks are nullified to prevent holding refrences to the consumer itself. When handle_rebalance() is called, the callback invoker throws because the assign callback is null, but its caught inside and returns an error instead. So the proper solution would be to check the error from the callback invoker.

This being said, I think it's an rdkafka bug, because when close is called, it should not call the assignment callback. The consumer is going away so this would just be a null assignment.