confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
55 stars 3.12k forks source link

Consumer gets stuck on rd_kafka_destroy #3143

Closed hunyadi-dev closed 2 years ago

hunyadi-dev commented 3 years ago

Description

When using the C API for implementing a kafka consumer, I think I am following the required termination sequence properly, but when I call rd_kafka_destroy on my consumer handle, it tends to hang up and never return.

When enabling the debug: all setting these are the last few logs before the issue happens:

%7|1605183208.326|CONNECT|rdkafka#consumer-5| [thrd:localhost:9092/bootstrap]: localhost:9092/0: Connected to ipv6#[::1]:9092
%7|1605183208.326|CONNECTED|rdkafka#consumer-5| [thrd:localhost:9092/bootstrap]: localhost:9092/0: Connected (#2)
%7|1605183208.326|STATE|rdkafka#consumer-5| [thrd:localhost:9092/bootstrap]: localhost:9092/0: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1605183208.326|BROADCAST|rdkafka#consumer-5| [thrd:localhost:9092/bootstrap]: Broadcasting state change
%7|1605183208.326|TERMINATE|rdkafka#consumer-5| [thrd:localhost:9092/bootstrap]: localhost:9092/0: Handle is terminating in state APIVERSION_QUERY: 3 refcnts (0x7f809100b918), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 1 request(s) in retry+outbuf
%7|1605183208.328|TERMINATE|rdkafka#consumer-5| [thrd:localhost:9092/bootstrap]: localhost:9092/0: Handle is terminating in state APIVERSION_QUERY: 3 refcnts (0x7f809100b918), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1605183208.329|TERMINATE|rdkafka#consumer-5| [thrd:localhost:9092/bootstrap]: localhost:9092/0: Handle is terminating in state APIVERSION_QUERY: 3 refcnts (0x7f809100b918), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
(this last message repeats indefinitely, notice: the first time this happens, there is one failed request)

How to reproduce

I prepared a github repo here with a small example project: https://github.com/hunyadi-dev/librdkafka_demo

Steps to build and run:

mkdir Librdkafka_issue_demo && cd Librdkafka_issue_demo
git clone https://github.com/hunyadi-dev/librdkafka_demo.git src
mkdir build && cd build
cmake -G Ninja -DCMAKE_BUILD_TYPE=Debug -DOPENSSL_ROOT_DIR=/usr/local/opt/openssl/ ../src
export OPENSSL_ROOT_DIR=/usr/local/opt/openssl/ # For some reason my cmake could not find this otherwise
ninja -j8
./librdkafka_issue_demo

Checklist

Librdkafka version: v1.5.0 (tried and is present on v1.5.2 as well) Apache Kafka version: 2.6.0 (Commit:62abe01bee039651) Operating system: macOS Catalina, version 10.15.7 (19H2) Broker log excerpt:

[2020-11-12 13:22:42,636] INFO [GroupCoordinator 0]: Preparing to rebalance group librdkafka_issue_demo_group in state PreparingRebalance with old generation 78 (__consumer_offsets-11) (reason: Adding new member rdkafka-b84cbc54-6946-4e99-87db-8c23b18f71ac with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:42,636] INFO [GroupCoordinator 0]: Stabilized group librdkafka_issue_demo_group generation 79 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:42,637] INFO [GroupCoordinator 0]: Assignment received from leader for group librdkafka_issue_demo_group for generation 79 (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:44,632] INFO [TransactionCoordinator id=0] Initialized transactionalId ConsumeKafkaTest_transaction_id with producerId 0 and producer epoch 111 on partition __transaction_state-43 (kafka.coordinator.transaction.TransactionCoordinator)
[2020-11-12 13:22:44,648] INFO [GroupCoordinator 0]: Member[group.instance.id None, member.id rdkafka-b84cbc54-6946-4e99-87db-8c23b18f71ac] in group librdkafka_issue_demo_group has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:44,649] INFO [GroupCoordinator 0]: Preparing to rebalance group librdkafka_issue_demo_group in state PreparingRebalance with old generation 79 (__consumer_offsets-11) (reason: removing member rdkafka-b84cbc54-6946-4e99-87db-8c23b18f71ac on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:44,649] INFO [GroupCoordinator 0]: Group librdkafka_issue_demo_group with generation 80 is now empty (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:44,657] INFO [GroupCoordinator 0]: Preparing to rebalance group librdkafka_issue_demo_group in state PreparingRebalance with old generation 80 (__consumer_offsets-11) (reason: Adding new member rdkafka-75d611a9-cd93-4148-8231-149cb3222908 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:44,658] INFO [GroupCoordinator 0]: Stabilized group librdkafka_issue_demo_group generation 81 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:44,658] INFO [GroupCoordinator 0]: Assignment received from leader for group librdkafka_issue_demo_group for generation 81 (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:46,659] INFO [TransactionCoordinator id=0] Initialized transactionalId ConsumeKafkaTest_transaction_id with producerId 0 and producer epoch 112 on partition __transaction_state-43 (kafka.coordinator.transaction.TransactionCoordinator)
[2020-11-12 13:22:46,674] INFO [GroupCoordinator 0]: Member[group.instance.id None, member.id rdkafka-75d611a9-cd93-4148-8231-149cb3222908] in group librdkafka_issue_demo_group has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:46,674] INFO [GroupCoordinator 0]: Preparing to rebalance group librdkafka_issue_demo_group in state PreparingRebalance with old generation 81 (__consumer_offsets-11) (reason: removing member rdkafka-75d611a9-cd93-4148-8231-149cb3222908 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:46,674] INFO [GroupCoordinator 0]: Group librdkafka_issue_demo_group with generation 82 is now empty (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:46,684] INFO [GroupCoordinator 0]: Preparing to rebalance group librdkafka_issue_demo_group in state PreparingRebalance with old generation 82 (__consumer_offsets-11) (reason: Adding new member rdkafka-0f6eb6ac-1822-4235-a0a3-e48d8626ecfc with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:46,684] INFO [GroupCoordinator 0]: Stabilized group librdkafka_issue_demo_group generation 83 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:46,685] INFO [GroupCoordinator 0]: Assignment received from leader for group librdkafka_issue_demo_group for generation 83 (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:48,692] INFO [TransactionCoordinator id=0] Initialized transactionalId ConsumeKafkaTest_transaction_id with producerId 0 and producer epoch 113 on partition __transaction_state-43 (kafka.coordinator.transaction.TransactionCoordinator)
[2020-11-12 13:22:50,697] INFO [GroupCoordinator 0]: Member[group.instance.id None, member.id rdkafka-0f6eb6ac-1822-4235-a0a3-e48d8626ecfc] in group librdkafka_issue_demo_group has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:50,697] INFO [GroupCoordinator 0]: Preparing to rebalance group librdkafka_issue_demo_group in state PreparingRebalance with old generation 83 (__consumer_offsets-11) (reason: removing member rdkafka-0f6eb6ac-1822-4235-a0a3-e48d8626ecfc on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:50,697] INFO [GroupCoordinator 0]: Group librdkafka_issue_demo_group with generation 84 is now empty (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 13:22:56,805] INFO [TransactionCoordinator id=0] Completed rollback of ongoing transaction for transactionalId ConsumeKafkaTest_transaction_id due to timeout (kafka.coordinator.transaction.TransactionCoordinator)

Critical issue: no

edenhill commented 3 years ago

Make sure that you have destroyed all other objects, topics, messages, etc, before calling destroy().

hunyadi-dev commented 3 years ago

I have printouts that show the deletion order of librdkafka handles (and paired up every handle with its proper destroy call as std::unique_ptr deleters):

Invoked topic deleter.
Invoked producer deleter
Producer_deleter done
Invoked topic partition list deleter.
Invoked consumer deleter.
Closing consumer connections...
Rebalance triggered.
revoked:
kf_topic_partition_list: [topic: ConsumeKafkaTest, partition: 0, offset: -1001]
Attempting to destroy consumer...
(stuck here)
edenhill commented 3 years ago

Could you try latest librdkafka master branch, we have this fix 650799afdaf8c7c810f16df5b902b809fec17ffd that is of interest.

hunyadi-dev commented 3 years ago

I tried the latest commit and the issue is still present. Also on the current HEAD some of my tests fail due to consumer rebalance not being triggered the first time rd_kafka_consumer_poll is called.

ajbarb commented 3 years ago

@edenhill, @hunyadi-dev , I am hitting a similar issue with one of the broker thread destroy blocked on refcnt 3 but the state is in INIT.

Level: Debug, Message: [thrd:25.107.195.133:9092/15]: 25.107.195.133:9092/15: Handle is terminating in state INIT: 3 refcnts (0000020077693340), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf

edenhill commented 3 years ago

@hunyadi-dev

The message in the "Resetting offsets manually" loop was not destroyed, adding that (and increasing the poll timeout so a rebalance always happens) fixed the hang.

This was on master.

edenhill commented 3 years ago

@ajbarb Please try to reproduce on latest master, make sure that all outstanding objects (messages in particular) are destroyed before destroying the consumer, and if still an issue, please provide a reproducible test case.

Mekk commented 1 year ago

I am also observing this problem in rather regular manner (currently testing on rdkafka 1.9.2 but saw it also on some older versions) .

Haven't managed to create simple example yet unfortunately (for one reason or another I fail to reproduce it on simple examples and actual problem happens on rather complex app), but some observations:

  1. I started to see it more often since I started to compile my code with sanitizer (ASAN+LSAN) and even more often since I started to set ASAN_OPTIONS =fast_unwind_on_malloc=0. Both things are likely not direclty related (esp. considering rdkafka itself is not asanized) but they have simple impact I noticed also in some other cases: threads dynamic changes, various things (malloc/free!) are slower and much more likely to allow context switch.

  2. IIRC I never got it on „pure consumer”, I get it on app which uses (within single process) a few producers and single consumer

  3. May matter or not that my code is using temporary topic which is removed earlier.

Picture of the stuck process in the debugger:

So the process logs „Handle is terminating in state INIT” indifinitely (I once left it for 4 days accidentally) and nothing else happens.

The latter may be of some importance: rd_kafka_terminating returns true but doesn't break the loop, rd_kafka_broker_terminating returns false as handle is in state INIT, so the loop continues.

I will try to grab some detailed debugging.

Mekk commented 1 year ago

One general idea before digging deeper: mayhaps rd_kafka_broker_thread_main could detect that it remains in that state (rd_kafka_terminating returns true) for a long time and finally break in such a case, as safety-protection, even if rd_kafka_broker_terminating returns false.

PS I don't suppose my code is leaking something, at least something I am aware of – I am rather dogmatic in using smartpointers and RAII objects, also I run simpler apps based on the same library and approach under sanitizer which doesn't report leaks.

Mekk commented 1 year ago

kafka-stuck-consumer.txt.gz

I attach full log of such stuck process (it is formatted using my custom format which may be of use as it shows timestamps and thread numbers but text is straight from rdkafka). This process produced some messages (to permanent topics, those APCTST*), consumed some (from temporary topic apctst-replyq-… which it creates and removes, in general this is some RPC emulation) and tries to shut down.

Some observations: …

Mekk commented 1 year ago

… „source of evil” is mayhaps in the thread change from DOWN back to INIT. It happens twice – two threads log Broker changed state DOWN -> INIT:

If I grep correctly, this change to INIT must have happened here: https://github.com/edenhill/librdkafka/blob/v1.9.2/src/rdkafka_broker.c#L5277

If I undestand correctly, we go back from DOWN to INIT because there are some remaining refcnts. But I have no clue what could those be.

Mekk commented 1 year ago

My suggestion: add some time limit for such state (rd_kafka_terminating but not rd_kafka_broker_terminating). If it lasts - say - minute (preferably configurable), give up and end broker in spite of extra refcnt's.

Then a) app won't be stuck forever anymore b) it will be possible to detect what exactly was leaked via sanitizer/valgrind/… (or if leaked ref causes crash, it will also be possible to detect what was accessed too late)

(this suggestion is valid even if I finally find some leak of my fault somewhere – such errors generally happen and … leak detectors usually require apps to finish before they find out leaks).

edenhill commented 1 year ago

https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#termination

Mekk commented 1 year ago

patch.txt (or the same in easier to read form: https://gist.github.com/Mekk/ed05c9fc95196bfb4aac5629375295e0 )

Rough patch implementing the idea above (breaks broker thread loop in case more than minute passes since rd_kafka_terminating returned true). I don't think it is applicable as-is but with some tuning (like making this timeout configurable) it may be reasonable. Diff made against v1.9.2

With rdkafka patched this way I faced problem a few times and in all of them app finished cleanly after this minute. What is important: sanitizer didn't detect any memory leaks, what may be confirming my claim that I don't leak anything (unless rdkafka makes some bulk recursive resource release).

Mekk commented 1 year ago

after-patch.txt.gz

For comparison, example log from patched rdkafka (final part shows rather clean completion). As I said, app was run under ASAN+LSAN and didn't report any memory leaks. I can't exclude possibility that my app still had some variable released later (will look for that) but it isn't anything trivial (unless my logging callback counts, but this one really should work to the very end).

To summarize, I'd really suggest applying change of such kind – even if it is app fault (I am not sure, but maybe), current behaviour is very unfriendly, one gets indifinitely stuck app with no realistic way to find what the problem is.

Mekk commented 1 year ago

Eureka!

Looks like in my case the problem was caused by RdKafka::Message object which sometimes was still alive (app works as pipeline, one thread consumes incoming messages, other threads process them). So - well, it was „my fault”.

But I was able to find it out only after using the patch above, as only then I could proceed and see (my) logs emitted „afterwards” and only then I could set breakpoints. Earlier it was practically impossible to debug the situation. So, in the interest of people facing similar problem, I'd really recommend applying such or similar safety-valve.

Mayhaps it would also make sense to make those refcnts a bit more subtle (count them separately per type) – if stuck app logged „3 refcnts: 2 messages, 1 connection” instead of „3 refcnts” it would be much easier to look for possible reason of the problem.

PS The fact that bare existence of received message object keeps connection from being closed is also by far non-obvious. Is it really necessary?

edenhill commented 1 year ago

Glad you found it!

I agree that troubleshooting this should be easier and that the debug logs are not very helpful, remaining object counts would indeed help.

As for allowing asymmetrical destruction of objects; no, this will open an absolute can of wormpain. There is a strict contract in that librdkafka is completely done with the client instance once rd_kafka_destroy() returns. If we would allow objects referencing the client instance to be alive after rd_kafka_destroy() returns that contract would be broken, and it would be very hard to reason about correctness.

mensfeld commented 1 year ago

@edenhill would the same apply to the JSON string that is being published in the statistics cb?

Could its presence also block the destroy?

edenhill commented 1 year ago

That particular callback has a return value for managing memory ownership, return 1 if you want to own the memory and free it (with rd_kafka_mem_free(NULL, ptr)) at your own discretion.

Mekk commented 1 year ago

Once we know we are stuck (and if a couple of minutes passed since we started to close connection and we can't proceed in any way and nothing is happening – we know that we are stuck and no change of this state can be expected) generally any behaviour would be more helpful. Including terminate(), kill(self, -9), _exit(1) or anything. Although allowing app to proceed with the destruction (after sufficiently alarming warning) is more friendly.

Crashed program will be restarted. Stuck program will stay being stuck. It usually won't work, but it will remain there. For a long time, possibly, especially if we have complicated cloud setup with no direct admin supervision.

So I really suggest that this scenario (close/destroy of the handle with some remaining refcnts which don't disappear in spite of prolonged wait) deserves reconsideration. At the moment rdkafka detects this scenario (OK, almost detects, full detection is very close as my patch illustrates) and opts to resolve it with equivalent of while(true) {sleep(shorttime);} what is in fact very unfriendly towards both admin (who may face stuck processes which don't log any problems but simply don't stop) and developer (who, if manages to reproduce the problem and locate it in debugger, is left with two threads infinitely-waiting for thread joins and unclear rdkafka thread looping) way to resolve the problem.

Of course my patch is oversimplistic and should be improved (for example by checking that really nothing changes = change of refcnt or any event resets deadline, by aforementioned configurability of this emergency timeout, by better warning/error message, maybe by some additional criteria, maybe by more brutal way to act) but it simply breaks this fatal loop.

My scenario (of attempt to close consumer object while there still exists not-yet-destroyed message somewhere) isn't that exotic and unlikely. Esp. considering strong asynchronicity of things. Scenario of accidental leak of message object also may happen (such problems are relatively easy to find with valgrind or LSAN … but only if the program finishes).

OK, I probably start repeating myself, so that's it. But please, consider those cases and the reaction on them.