morganstanley / modern-cpp-kafka

A C++ API for Kafka clients (i.e. KafkaProducer, KafkaConsumer, AdminClient)
Apache License 2.0
331 stars 86 forks source link

Missing consumer cb causes librdkafka assertion failure #207

Open Arcoth opened 1 year ago

Arcoth commented 1 year ago

Hi,

consider the following scenario: After subscribing and seeking into some topic X, while performing the subscribe on the KafkaConsumer of a further topic Y, we end up inside rd_kafka_q_serve (which polls and executes operations) with an operation of rko_type ==RD_KAFKA_OP_FETCH and cb_type == RD_KAFKA_Q_CB_CALLBACK (presumably concerning topic X). This eventually leads us into rd_kafka_poll_cb which says

                if (!rk->rk_conf.consume_cb ||
                    cb_type == RD_KAFKA_Q_CB_RETURN ||
                    cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
                        return RD_KAFKA_OP_RES_PASS; /* Dont handle here */

and we eventually hit the assertion in the q_serve because all ops must be handled:

                res = rd_kafka_op_handle(rk, &localq, rko, cb_type, opaque,
                                         callback);
                /* op must have been handled */
                rd_kafka_assert(NULL, res != RD_KAFKA_OP_RES_PASS);

The solution I tried was adding the consume_cb that is missing above, which KafkaConsumer does not set. So I add

    rd_kafka_conf_set_consume_cb(conf,
    [](rd_kafka_message_t* rkmessage, void* opaque) {
        throw;
    });

into KafkaConsumer::registerConfigCallbacks. Interestingly, because the rko version of the fetch operation is outdated, it is discarded anyway, see rd_kafka_consume_cb

        if (unlikely(rd_kafka_op_version_outdated(rko, 0)) ||
            rko->rko_type == RD_KAFKA_OP_BARRIER) {
                rd_kafka_op_destroy(rko);
                return RD_KAFKA_OP_RES_HANDLED;
        }

so actually the CB is never invoked, however is required by librdkafka logic. Can we add a vacuous callback (perhaps with either an exception or just no-op) so this failure mode is averted?

kenneth-jia commented 1 year ago

Hi, Arcoth, Could you please help show some demo code about how to trigger the issue? According to my understanding, calling the consumser.subscribe(...) twice (for 2 different topics) would have a chance to reproduce it, right?