Open Quuxplusone opened 2 months ago
@Quuxplusone thanks for the description of the problem. Will try to reproduce it, maybe it's linked to stopping and restarting the broker. Could you try it without rd_kafka_conf_set_rebalance_cb
to see if it happens too?
@Quuxplusone thanks for the description of the problem. Will try to reproduce it, maybe it's linked to stopping and restarting the broker. Could you try it without
rd_kafka_conf_set_rebalance_cb
to see if it happens too?
With vanilla librdkafka v2.3.0, but with our custom rd_kafka_conf_set_rebalance_cb
removed from the groupconsumer, our test is still red. With v2.3.0 plus removing our rd_kafka_conf_set_rebalance_cb
plus adding #4667, our test remains green. (IOW, whatever our weird rebalance callback is doing, and regardless of whether it's necessary for us in general, it seems irrelevant to this specific test.)
@Quuxplusone we're also facing similar issues in our use-cases, but it's very sporadic. We're working on a stable reproducer to move further. Checking if you have a public reproducer that you could share?
Currently, I'm trying with the following test-setup, but the issue is reproduced very rarely:
Any ideas for improving the reproducer would be very helpful.
@Quuxplusone we're also facing similar issues in our use-cases, but it's very sporadic. We're working on a stable reproducer to move further. Checking if you have a public reproducer that you could share?
Nope, I never came up with a reproducer. But we haven't seen the problem at all since we applied #4667. I can add that our test topic definitely didn't have 1000 partitions, and off the top of my head I'm pretty sure it just had 1. Likewise I'm pretty sure we have only 1 broker.
Thanks for the inputs @Quuxplusone. Using the steps mentioned in my previous comment, I could sporadically reproduce the issue. I tried collecting valgrind
traces and noticed a definite memory leak in the cases where I could reproduce the issue:
==3234364== 6,836 (1,000 direct, 5,836 indirect) bytes in 1 blocks are definitely lost in loss record 216 of 223
==3234364== at 0x4C3CE4B: calloc (vg_replace_malloc.c:1328)
==3234364== by 0xA02869A: rd_calloc (rd.h:161)
==3234364== by 0xA02869A: rd_kafka_topic_new0 (rdkafka_topic.c:333)
==3234364== by 0xA07C92A: rd_kafka_toppar_get2 (rdkafka_partition.c:443)
==3234364== by 0xA07FC16: rd_kafka_topic_partition_ensure_toppar (rdkafka_partition.c:2964)
==3234364== by 0xA088476: rd_kafka_assignment_add (rdkafka_assignment.c:732)
==3234364== by 0xA0664EA: rd_kafka_cgrp_assign (rdkafka_cgrp.c:3659)
==3234364== by 0xA07494B: rd_kafka_cgrp_handle_assign_op (rdkafka_cgrp.c:4858)
==3234364== by 0xA07494B: rd_kafka_cgrp_op_serve (rdkafka_cgrp.c:5023)
==3234364== by 0xA03DD0E: rd_kafka_q_serve (rdkafka_queue.c:553)
==3234364== by 0xA00D6F3: rd_kafka_thread_main (rdkafka.c:2117)
==3234364== by 0x55BF2D1: start_thread (pthread_create.c:476)
==3234364== by 0x5F9AE72: clone (clone.S:95)
Is it of any help in debugging the root cause? This memory leak isn't present when the test-program exits cleanly. Noting that I've also used @Mekk's patch, to trigger the memory leak otherwise it'd be very difficult to identify from the dozen's of objects in the heap.
Cc: @emasab @edenhill
@Quuxplusone could you verify these things?
rd_kafka_destroy
without closing the consumer first. That will close the consumer too, but after setting a flag.rd_kafka_destroy()
, even in corner casesFor C, make sure the following objects are destroyed prior to calling
rd_kafka_consumer_close()
andrd_kafka_destroy()
:
rd_kafka_message_t
rd_kafka_topic_t
rd_kafka_topic_partition_t
rd_kafka_topic_partition_list_t
rd_kafka_event_t
rd_kafka_queue_t
- that all these objects are destroyed before calling
rd_kafka_destroy()
, even in corner casesFor C, make sure the following objects are destroyed prior to calling
rd_kafka_consumer_close()
andrd_kafka_destroy()
:
rd_kafka_message_t
rd_kafka_topic_t
rd_kafka_topic_partition_t
rd_kafka_topic_partition_list_t
rd_kafka_event_t
rd_kafka_queue_t
Could you explain how to verify that? (Especially since in this case we have multiple independent rd_kafka_t
objects alive at once.)
Our working hypothesis is still that #4667 fixes a leak of one rd_kafka_toppar_keep
. All our observations are consistent with that hypothesis, so after applying #4667 downstream and seeing no more hangs, I haven't looked at this in quite a while. If it's possible to detect leaked toppars, such that I could follow your advice above to make sure I wasn't leaking anything... could you apply the same approach to determine whether librdkafka is leaking?
@Quuxplusone given https://github.com/confluentinc/librdkafka/commit/8e20e1ee79b188ae610aac3a2d2517f7f12dd890 adds a reference from an rd_kafka_op_t
of type BARRIER
to the topic partition in case you're forwarding a partition to a queue you should verify that queue destruction happens before calling destroy on those rd_kafka_t
objects.
I've created PR https://github.com/confluentinc/librdkafka/pull/4724 to fix that case I was mentioning in https://github.com/confluentinc/librdkafka/pull/4667 . You can try it to see if it fixes your case.
@Quuxplusone can this be closed, after merging the fix, is #4724 solving your case?
@emasab: I haven't tested #4724. At the moment, it has merge conflicts.
Our working hypothesis is still that https://github.com/confluentinc/librdkafka/pull/4667 fixes a leak of one rd_kafka_toppar_keep
. All our observations are consistent with that hypothesis, so after applying https://github.com/confluentinc/librdkafka/pull/4667 downstream and seeing no more hangs, I haven't looked at this in quite a while.
When there's a new release of librdkafka, we'll certainly try upgrading to it, and then we can see whether it's safe for us to revert #4667 or whether we still see the problem. But #4724 isn't even in master yet, let alone a new release, right?
I suggest that you should at least try to figure out whether #4724 is correct; if it is correct, then merge it; if it's incorrect, don't merge it. This should be done completely independently of whether it happens to fix #4674 or not.
@Quuxplusone #4667 fixes the leak because it removes the reference to the toppar but that was introduced to fix a bug because if the barrier doesn't contain the toppar but only the epoch, the code removes messages with that epoch even if from different partition, that are never delivered. So the fix cannot be reverted.
Given you cannot reliably reproduce the case except with your test that you cannot share I don't have a way to check if it solves your case. When the fix is released you can try to see if you issue is solved.
I'll also try to reproduce it before or after the fix from the instructions from @Quuxplusone or @Mrigank11.
I'm pulling this out into a new Issue, because we keep discussing it in different random PRs/issues and it'll help to have just one place to discuss it.
On #4667 I wrote:
Then in https://github.com/confluentinc/librdkafka/pull/4669#issuecomment-2037799703 I wrote:
Then @emasab asked:
I can't get super detailed (because the layers of abstraction between librdkafka and the level of the test itself are all three of confusing/internal/proprietary), but I'm pretty sure we don't know anything about "assignors" so we're doing whatever the default is, there. The structure of our test is more or less:
rd_kafka_t
s in the same process: one a groupconsumer on topic T1 and broker B1, and the other a producer on topic T2 on broker B2. (Actually, make that four: we have another (non-group) consumer running, and another producer, too, for other purposes. I think it's just four total.)enable.auto.commit=false
,enable.auto.offset.store=false
,auto.offset.reset=smallest
. Also,rd_kafka_conf_set_rebalance_cb
to a callback that looks forRD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
and remembers how many partitions it saw.rd_kafka_consumer_poll
until either it returns NULL or until it has returnedRD_KAFKA_RESP_ERR__PARTITION_EOF
on T1 as many times as there are partitions. (I suspect this second condition, and thus the rebalance callback, is irrelevant; but I don't know.)rd_kafka_commit(rk, toppar, true)
wheretoppar
is freshly created withrd_kafka_topic_partition_list_new
and destroyed withrd_kafka_topic_partition_list_destroy
immediately afterward.)rd_kafka_consumer_close(rk)
. This seems to returnRD_KAFKA_RESP_ERR_NO_ERROR
.rd_kafka_destroy(rk)
. This blocks here...rd_kafka_thread_main
to exit, but it's blocked...rd_kafka_broker_thread_main
but I'm not sure which thread. We have a total of 35 threads still extant at this point. Remember that the nongroup consumer and the two producers are all still running.Anyway, I don't think this gives you enough to go on, but at least we have a central place to talk about the hang now, instead of scattering it over different PRs.