confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
301 stars 3.15k forks source link

rd_kafka_assign() hangs forever when closing consumer #4884

Open aiquestion opened 1 month ago

aiquestion commented 1 month ago

Description

we enconter a issue when closing the consumer in Rust SDK, it hangs calling rd_kafka_assign() in rebalance_cb. code is like (using C code)

rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE|RD_KAFKA_EVENT_OFFSET_COMMIT|RD_KAFKA_EVENT_ERROR|RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH);
rd_kafka_poll_set_consumer(rk);
rd_kafka_queue_t * queue = rd_kafka_queue_get_consumer(rk);
// should poll here, but we don't do poll to reproduce the bug

// close consumer
rd_kafka_consumer_close_queue(rk, queue);
while (!rd_kafka_consumer_closed(rk)) {
                rd_kafka_event_t *rkev;
                rkev = rd_kafka_queue_poll(queue, 60 * 1000);
                fprintf(stderr, "%% Get event %s \n", rd_kafka_event_name(rkev));
                if (rkev) {
                        if (rd_kafka_event_type(rkev) == RD_KAFKA_EVENT_REBALANCE) {
                                rebalance_cb_event(rk, rkev);
                        }
                }
}

static void rebalance_cb_event(rd_kafka_t *rk,
                               rd_kafka_event_t* rkev) {
        rd_kafka_error_t *error     = NULL;
        rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
        switch (rd_kafka_event_error(rkev)) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                print_partition_list(stderr, rd_kafka_topic_partition_list_new(0));
                if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
                        error = rd_kafka_incremental_assign(
                            rk, rd_kafka_topic_partition_list_new(0));
                }
                else {
                        ret_err = rd_kafka_assign(
                            rk, rd_kafka_topic_partition_list_new(0));
                        printf(stderr, "assign ret: %s\n",
                               rd_kafka_err2str(ret_err));
                        sleep(3);
                }
                break;
        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                print_partition_list(stderr, rd_kafka_topic_partition_list_new(0));
                if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
                        error = rd_kafka_incremental_unassign(rk, rd_kafka_topic_partition_list_new(0));
                } else {
                        ret_err  = rd_kafka_assign(rk, NULL);
                        wait_eof = 0;
                }
                break;
        default:
                rd_kafka_assign(rk, NULL);
                break;
        }
        if (error) {
                fprintf(stderr, "incremental assign failure: %s\n",
                        rd_kafka_error_string(error));
                rd_kafka_error_destroy(error);
        } else if (ret_err) {
                fprintf(stderr, "assign failure: %s\n",
                        rd_kafka_err2str(ret_err));
        }
}

In some cases the program will never exist, and the call stack hangs in rd_kafka_assign

  * frame #0: 0x00000001916019ec libsystem_kernel.dylib`__psynch_cvwait + 8
    frame #1: 0x000000019163f55c libsystem_pthread.dylib`_pthread_cond_wait + 1228
    frame #2: 0x00000001003ee258 rdkafka_complex_consumer_example`cnd_wait + 12
    frame #3: 0x0000000100397c14 rdkafka_complex_consumer_example`rd_kafka_q_pop_serve + 620
    frame #4: 0x000000010039a764 rdkafka_complex_consumer_example`rd_kafka_op_req + 148
    frame #5: 0x00000001003ce394 rdkafka_complex_consumer_example`rd_kafka_assign + 80
    frame #6: 0x00000001003686f0 rdkafka_complex_consumer_example`main [inlined] rebalance_cb_event(rk=0x000000014e010a00, rkev=<unavailable>) at rdkafka_complex_consumer_example.c:175:35 [opt]
    frame #7: 0x0000000100368634 rdkafka_complex_consumer_example`main(argc=<unavailable>, argv=<unavailable>) at rdkafka_complex_consumer_example.c:619:33 [opt]

After investigation, it seems that:

// in rdkafka.c thread_main:
                int cnt = rd_kafka_q_serve(rk->rk_ops, timeout_ms, 0,
                                 RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
                if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
                        rd_kafka_cgrp_serve(rk->rk_cgrp);

a proper fix is to handle all rkcg->rkcg_ops before rd_kafka_q_purge(rkcg->rkcg_ops); PR

How to reproduce

It's a random case, i managed to re-produce it after i modify many code:

build rdkafka_complex_consumer_example.c and run it. it will hang. after apply the fix in PR, it can close successfully.

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/confluentinc/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information: