confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
293 stars 3.15k forks source link

ThreadSanitizer: data race + hang in rd_kafka_destroy (or rd_kafka_destroy_flags) #4811

Open blindspotbounty opened 3 months ago

blindspotbounty commented 3 months ago

Read the FAQ first: https://github.com/confluentinc/librdkafka/wiki/FAQ

Do NOT create issues for questions, use the discussion forum: https://github.com/confluentinc/librdkafka/discussions

Description

I was investigating hang on destroy that specifically reproduces on x86 platform (cannot reproduce with Mac OS/Linux arch64/M2).

How to reproduce

But on x86, it is good reproducible with the following scenario:

  1. Subscribe to topic and get messages
  2. Revoke all partitions (e.g. remove topic)
  3. Close consumer + destroy client

Better reproducible with rebalance enabled and assign called from different thread, i.e.:

  1. Subscribe to topic with rebalance enabled
  2. Listen to rebalance but do not call assign/assign incremental in call back, instead put it to different thread for processing
  3. Confirm revoke in other thread
  4. Close consumer + destroy client

The only thing that I've found that is pretty good reproducible on x86 platform is the following race (which I don't see on arm):

WARNING: ThreadSanitizer: data race (pid=2992557)
Write of size 4 at 0x5625e6afb020 by thread T71 (mutexes: write M0):
    #0 rd_kafka_fetch_pos2str ??:? (swift-kafka-clientPackageTests.xctest+0xa00e98)
#1 rd_kafka_toppar_fetch_decide ??:? (swift-kafka-clientPackageTests.xctest+0x8be8a1)
#2 rd_kafka_broker_consumer_toppar_serve ??:? (swift-kafka-clientPackageTests.xctest+0xa0a167)
#3 rd_kafka_broker_consumer_toppars_serve rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x84bd86)
#4 rd_kafka_broker_consumer_serve rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x84b6e2)
#5 rd_kafka_broker_serve rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x849a0a)
#6 rd_kafka_broker_thread_main rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x83fd6f)
#7 _thrd_wrapper_function tinycthread.c:? (swift-kafka-clientPackageTests.xctest+0xb2df8c)
Previous write of size 4 at 0x5625e6afb020 by thread T70 (mutexes: write M1):
#0 rd_kafka_fetch_pos2str ??:? (swift-kafka-clientPackageTests.xctest+0xa00e98)
#1 rd_kafka_toppar_fetch_decide ??:? (swift-kafka-clientPackageTests.xctest+0x8be8a1)
#2 rd_kafka_broker_consumer_toppar_serve ??:? (swift-kafka-clientPackageTests.xctest+0xa0a167)
#3 rd_kafka_broker_consumer_toppars_serve rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x84bd86)
#4 rd_kafka_broker_consumer_serve rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x84b6e2)
#5 rd_kafka_broker_serve rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x849a0a)
#6 rd_kafka_broker_thread_main rdkafka_broker.c:? (swift-kafka-clientPackageTests.xctest+0x83fd6f)
#7 _thrd_wrapper_function tinycthread.c:? (swift-kafka-clientPackageTests.xctest+0xb2df8c)

  Location is global '<null>' at 0x000000000000 (swift-kafka-clientPackageTests.xctest+0x38a5020)

  Mutex M0 (0x7b6400040698) created at:
#0 pthread_mutex_init ??:? (swift-kafka-clientPackageTests.xctest+0x10ce50)
#1 mtx_init ??:? (swift-kafka-clientPackageTests.xctest+0xb2d938)
    #2 rd_kafka_toppar_new0 ??:? (swift-kafka-clientPackageTests.xctest+0x9fe225)
#3 rd_kafka_topic_partition_cnt_update rdkafka_topic.c:? (swift-kafka-clientPackageTests.xctest+0xae5a85)
#4 rd_kafka_topic_metadata_update rdkafka_topic.c:? (swift-kafka-clientPackageTests.xctest+0xae38e5)
#5 rd_kafka_topic_new0 ??:? (swift-kafka-clientPackageTests.xctest+0xae2ba7)
    #6 rd_kafka_toppar_get2 ??:? (swift-kafka-clientPackageTests.xctest+0xa01191)
#7 rd_kafka_topic_partition_ensure_toppar ??:? (swift-kafka-clientPackageTests.xctest+0xa0e00a)
#8 rd_kafka_assignment_add ??:? (swift-kafka-clientPackageTests.xctest+0x820922)
#9 rd_kafka_cgrp_assign rdkafka_cgrp.c:? (swift-kafka-clientPackageTests.xctest+0x883ee2)
#10 rd_kafka_cgrp_handle_assign_op rdkafka_cgrp.c:? (swift-kafka-clientPackageTests.xctest+0x8a03ed)
#11 rd_kafka_cgrp_op_serve rdkafka_cgrp.c:? (swift-kafka-clientPackageTests.xctest+0x85d807)
#12 rd_kafka_op_handle ??:? (swift-kafka-clientPackageTests.xctest+0x9fc974)
#13 rd_kafka_q_serve ??:? (swift-kafka-clientPackageTests.xctest+0xa1d3a6)
#14 rd_kafka_thread_main rdkafka.c:? (swift-kafka-clientPackageTests.xctest+0x76de7b)
#15 _thrd_wrapper_function tinycthread.c:? (swift-kafka-clientPackageTests.xctest+0xb2df8c)
Mutex M1 (0x7b6400050598) created at:
#0 pthread_mutex_init ??:? (swift-kafka-clientPackageTests.xctest+0x10ce50)
#1 mtx_init ??:? (swift-kafka-clientPackageTests.xctest+0xb2d938)
#2 rd_kafka_toppar_new0 ??:? (swift-kafka-clientPackageTests.xctest+0x9fe225)
#3 rd_kafka_topic_partition_cnt_update rdkafka_topic.c:? (swift-kafka-clientPackageTests.xctest+0xae5a85)
#4 rd_kafka_topic_metadata_update rdkafka_topic.c:? (swift-kafka-clientPackageTests.xctest+0xae38e5)
#5 rd_kafka_topic_new0 ??:? (swift-kafka-clientPackageTests.xctest+0xae2ba7)
#6 rd_kafka_toppar_get2 ??:? (swift-kafka-clientPackageTests.xctest+0xa01191)
#7 rd_kafka_topic_partition_ensure_toppar ??:? (swift-kafka-clientPackageTests.xctest+0xa0e00a)
#8 rd_kafka_assignment_add ??:? (swift-kafka-clientPackageTests.xctest+0x820922)
#9 rd_kafka_cgrp_assign rdkafka_cgrp.c:? (swift-kafka-clientPackageTests.xctest+0x883ee2)
#10 rd_kafka_cgrp_handle_assign_op rdkafka_cgrp.c:? (swift-kafka-clientPackageTests.xctest+0x8a03ed)
    #11 rd_kafka_cgrp_op_serve rdkafka_cgrp.c:? (swift-kafka-clientPackageTests.xctest+0x85d807)
#12 rd_kafka_op_handle ??:? (swift-kafka-clientPackageTests.xctest+0x9fc974)
    #13 rd_kafka_q_serve ??:? (swift-kafka-clientPackageTests.xctest+0xa1d3a6)
    #14 rd_kafka_thread_main rdkafka.c:? (swift-kafka-clientPackageTests.xctest+0x76de7b)
#15 _thrd_wrapper_function tinycthread.c:? (swift-kafka-clientPackageTests.xctest+0xb2df8c)

  Thread T71 'rdk:broker2' (tid=2992639, running) created by thread T67 at:
    #0 pthread_create ??:? (swift-kafka-clientPackageTests.xctest+0x10b4db)
#1 thrd_create ??:? (swift-kafka-clientPackageTests.xctest+0xb2de09)
    #2 rd_kafka_broker_add ??:? (swift-kafka-clientPackageTests.xctest+0x83e1fb)
    #3 rd_kafka_broker_update ??:? (swift-kafka-clientPackageTests.xctest+0x8424c1)
#4 rd_kafka_parse_Metadata0 rdkafka_metadata.c:? (swift-kafka-clientPackageTests.xctest+0x9071f8)
    #5 rd_kafka_parse_Metadata ??:? (swift-kafka-clientPackageTests.xctest+0x8e77b5)
#6 rd_kafka_handle_Metadata rdkafka_request.c:? (swift-kafka-clientPackageTests.xctest+0xa7fa7c)
#7 rd_kafka_buf_callback ??:? (swift-kafka-clientPackageTests.xctest+0x8569e8)
    #8 rd_kafka_buf_handle_op ??:? (swift-kafka-clientPackageTests.xctest+0x857db9)
#9 rd_kafka_op_handle_std ??:? (swift-kafka-clientPackageTests.xctest+0x9fc5ad)
#10 rd_kafka_op_handle ??:? (swift-kafka-clientPackageTests.xctest+0x9fc8f2)
#11 rd_kafka_q_serve ??:? (swift-kafka-clientPackageTests.xctest+0xa1d3a6)
#12 rd_kafka_thread_main rdkafka.c:? (swift-kafka-clientPackageTests.xctest+0x76de7b)
#13 _thrd_wrapper_function tinycthread.c:? (swift-kafka-clientPackageTests.xctest+0xb2df8c)
Thread T70 'rdk:broker1' (tid=2992638, running) created by thread T67 at:
#0 pthread_create ??:? (swift-kafka-clientPackageTests.xctest+0x10b4db)
#1 thrd_create ??:? (swift-kafka-clientPackageTests.xctest+0xb2de09)
#2 rd_kafka_broker_add ??:? (swift-kafka-clientPackageTests.xctest+0x83e1fb)
#3 rd_kafka_broker_update ??:? (swift-kafka-clientPackageTests.xctest+0x8424c1)
#4 rd_kafka_parse_Metadata0 rdkafka_metadata.c:? (swift-kafka-clientPackageTests.xctest+0x9071f8)
#5 rd_kafka_parse_Metadata ??:? (swift-kafka-clientPackageTests.xctest+0x8e77b5)
    #6 rd_kafka_handle_Metadata rdkafka_request.c:? (swift-kafka-clientPackageTests.xctest+0xa7fa7c)
    #7 rd_kafka_buf_callback ??:? (swift-kafka-clientPackageTests.xctest+0x8569e8)
#8 rd_kafka_buf_handle_op ??:? (swift-kafka-clientPackageTests.xctest+0x857db9)
#9 rd_kafka_op_handle_std ??:? (swift-kafka-clientPackageTests.xctest+0x9fc5ad)
#10 rd_kafka_op_handle ??:? (swift-kafka-clientPackageTests.xctest+0x9fc8f2)
    #11 rd_kafka_q_serve ??:? (swift-kafka-clientPackageTests.xctest+0xa1d3a6)
    #12 rd_kafka_thread_main rdkafka.c:? (swift-kafka-clientPackageTests.xctest+0x76de7b)
#13 _thrd_wrapper_function tinycthread.c:? (swift-kafka-clientPackageTests.xctest+0xb2df8c)
SUMMARY: ThreadSanitizer: data race ??:? in rd_kafka_fetch_pos2str

It is better reproducible with v2.3.0 and less with v2.5.0. With 2.3.0 I can reproduce it without external rebalance and with 2.5.0 with custom rebalance it is much better reproducible.

Reproducing this scenario pretty good with swift wrapper. There is a code in swift that can catch this race:

  1. git clone --recursive https://github.com/ordo-one/swift-kafka-client
  2. git checkout try-reproduce-race
  3. KAFKA_HOST= swift test --filter testConsumerWithRebalance --sanitize=thread

As the only difference between platforms is this race catched by TSan, I suspect that this might be a problem for future client destroy method.

Remark: I was trying to use kafka_destroy and kafka_destroy_flags with RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE but works the same.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

blindspotbounty commented 3 months ago

Additionally have the following log of refcounts:

REFCNT DEBUG: (&(rkb)->rkb_refcnt)                0 +1:   0x7f11d0018df8: rd_kafka_broker_add:4978
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                1 +1:   0x7f11d0018df8: rd_kafka_broker_add:5046
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                2 +1:   0x7f11d0018df8: rd_kafka_broker_monitor_add:6091
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                3 +1:   0x7f11d0018df8: rd_kafka_broker_update:5593
REFCNT DEBUG: &(rkb)->rkb_refcnt                  4 -1:   0x7f11d0018df8: rd_kafka_broker_update:5618
REFCNT DEBUG: &(rkb)->rkb_refcnt                  3 +1:   0x7f11d0018df8: rd_kafka_broker_update:5580
REFCNT DEBUG: &(rkb)->rkb_refcnt                  4 -1:   0x7f11d0018df8: rd_kafka_broker_update:5618
REFCNT DEBUG: &(rkb)->rkb_refcnt                  3 +1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1315
REFCNT DEBUG: &(rkb)->rkb_refcnt                  4 +1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1315
REFCNT DEBUG: &(rkb)->rkb_refcnt                  5 +1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1315
REFCNT DEBUG: &(rkb)->rkb_refcnt                  6 +1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1315
REFCNT DEBUG: (&(leader)->rkb_refcnt)             7 +1:   0x7f11d0018df8: rd_kafka_toppar_leader_update:744
REFCNT DEBUG: (&(new_rkb)->rkb_refcnt)            8 +1:   0x7f11d0018df8: rd_kafka_toppar_broker_migrate:1045
REFCNT DEBUG: &(leader)->rkb_refcnt               9 -1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1405
REFCNT DEBUG: (&(leader)->rkb_refcnt)             8 +1:   0x7f11d0018df8: rd_kafka_toppar_leader_update:744
REFCNT DEBUG: (&(new_rkb)->rkb_refcnt)            9 +1:   0x7f11d0018df8: rd_kafka_toppar_broker_migrate:1045
REFCNT DEBUG: &(leader)->rkb_refcnt               10 -1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1405
REFCNT DEBUG: (&(leader)->rkb_refcnt)             9 +1:   0x7f11d0018df8: rd_kafka_toppar_leader_update:744
REFCNT DEBUG: (&(new_rkb)->rkb_refcnt)            10 +1:   0x7f11d0018df8: rd_kafka_toppar_broker_migrate:1045
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                11 +1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3350
REFCNT DEBUG: &(rktp->rktp_next_broker)->rkb_refcnt 12 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3367
REFCNT DEBUG: &(leader)->rkb_refcnt               11 -1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1405
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                11 +1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3350
REFCNT DEBUG: &(rktp->rktp_next_broker)->rkb_refcnt 11 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3367
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                10 +1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3350
REFCNT DEBUG: &(rktp->rktp_next_broker)->rkb_refcnt 11 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3367
REFCNT DEBUG: (&(leader)->rkb_refcnt)             10 +1:   0x7f11d0018df8: rd_kafka_toppar_leader_update:744
REFCNT DEBUG: (&(new_rkb)->rkb_refcnt)            11 +1:   0x7f11d0018df8: rd_kafka_toppar_broker_migrate:1045
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                12 +1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3350
REFCNT DEBUG: &(rktp->rktp_next_broker)->rkb_refcnt 13 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3367
REFCNT DEBUG: &(leader)->rkb_refcnt               12 -1:   0x7f11d0018df8: rd_kafka_topic_metadata_update:1405
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                11 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                12 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                13 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                14 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                15 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   16 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                17 +1:   0x7f11d0018df8: rd_kafka_broker_trigger_monitors:6055
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     18 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rko->rko_u.broker_monitor.rkb)->rkb_refcnt 18 -1:   0x7f11d0018df8: rd_kafka_op_destroy:473
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     16 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   15 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     16 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     15 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                15 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   15 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     16 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     15 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   14 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     15 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     14 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   13 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     14 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     13 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   12 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     13 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     12 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                11 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   12 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     13 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                12 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     13 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                12 +1:   0x7f11d0018df8: rd_kafka_broker_weighted:1482
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                13 +1:   0x7f11d0018df8: rd_kafka_buf_new_request0:162
REFCNT DEBUG: &(rkb)->rkb_refcnt                  14 -1:   0x7f11d0018df8: rd_kafka_metadata_refresh_topics:1423
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   13 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     14 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     13 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: (&(rkbuf->rkbuf_rkb)->rkb_refcnt)   12 +1:   0x7f11d0018df8: rd_kafka_req_response:1945
REFCNT DEBUG: &(rkb)->rkb_refcnt                  13 +1:   0x7f11d0018df8: rd_kafka_broker_update:5580
REFCNT DEBUG: &(rkb)->rkb_refcnt                  14 -1:   0x7f11d0018df8: rd_kafka_broker_update:5618
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     13 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rkbuf->rkbuf_rkb)->rkb_refcnt     12 -1:   0x7f11d0018df8: rd_kafka_buf_destroy_final:78
REFCNT DEBUG: &(rktp->rktp_broker)->rkb_refcnt    11 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3443
REFCNT DEBUG: &(rktp->rktp_broker)->rkb_refcnt    10 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3443
REFCNT DEBUG: &(rktp->rktp_broker)->rkb_refcnt    9 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3443
REFCNT DEBUG: &(rktp->rktp_broker)->rkb_refcnt    8 -1:   0x7f11d0018df8: rd_kafka_broker_op_serve:3443
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                7 +1:   0x7f11d0018df8: rd_kafka_broker_weighted:1482
REFCNT DEBUG: &(good)->rkb_refcnt                 8 -1:   0x7f11d0018df8: rd_kafka_broker_weighted:1481
REFCNT DEBUG: &(rkb)->rkb_refcnt                  7 +1:   0x7f11d0018df8: rd_kafka_broker_update:5580
REFCNT DEBUG: &(rkb)->rkb_refcnt                  8 -1:   0x7f11d0018df8: rd_kafka_broker_update:5618
REFCNT DEBUG: &(rkb)->rkb_refcnt                  7 -1:   0x7f11d0018df8: rd_kafka_destroy_internal:1225
REFCNT DEBUG: (&(rkb)->rkb_refcnt)                6 +1:   0x7f11d0018df8: rd_kafka_broker_trigger_monitors:6055
REFCNT DEBUG: &(rko->rko_u.broker_monitor.rkb)->rkb_refcnt 7 -1:   0x7f11d0018df8: rd_kafka_op_destroy:473
REFCNT DEBUG: &(rkb)->rkb_refcnt                  6 -1:   0x7f11d0018df8: rd_kafka_broker_monitor_del:6122

But I am not very good aware of librdkafka internals to understand which refcounts were not decremented. Could someone help me check that, please?

emasab commented 1 month ago

Hi @blindspotbounty I think that data race doesn't have to do with the hang on destroy, as both threads are accessing the rd_kafka_fetch_pos2str and writing to the static field with the return string (it's possible this leads to an incorrect log message though, but still NULL terminated)

For the hang on destroy, especially if you're removing a topic, try applying this PR https://github.com/confluentinc/librdkafka/pull/4724