fede1024 / rust-rdkafka

A fully asynchronous, futures-based Kafka client library for Rust based on librdkafka
MIT License
1.63k stars 280 forks source link

Deadlock in `BaseConsumer::drop` after `store_offsets` #453

Open jorendorff opened 2 years ago

jorendorff commented 2 years ago

@mbellani and I observed hangs when shutting down our program, which uses BaseConsumer with these ClientConfig options:

        .set("enable.partition.eof", "false")
        .set("enable.auto.offset.store", "false") // application will set consumer offsets explicitly
        .set("enable.auto.commit", "true") // same as default value: let librdkafka handle committing offsets

The program hangs when dropping the BaseConsumer. Dropping the consumer calls rd_kafka_destroy which never returns.

The minimized test case is just 52 lines of code.

Full steps to reproduce: https://github.com/jorendorff/librdkafka-hang#setup

jorendorff commented 2 years ago

I don't know if the problem is in our application, in rust-rdkafka, or in librdkafka.

Looking at the hung process in lldb on a Mac, there are three threads:

(lldb) thread list
Process 40283 stopped
* thread #1: tid = 0x1a27eb4, 0x00007fff204279ee libsystem_kernel.dylib`__ulock_wait + 10, queue = 'com.apple.main-thread', stop reason = signal SIGSTOP
  thread #2: tid = 0x1a27ec0, 0x00007fff204279ee libsystem_kernel.dylib`__ulock_wait + 10, name = 'rdk:main'
  thread #3: tid = 0x1a27ec2, 0x00007fff20428cde libsystem_kernel.dylib`__psynch_cvwait + 10, name = 'rdk:broker1'

Thread 1, the main thread, is waiting for another thread to exit:

* thread #1, queue = 'com.apple.main-thread'
  * frame #0: 0x00007fff204279ee libsystem_kernel.dylib`__ulock_wait + 10
    frame #1: 0x00007fff2045cf60 libsystem_pthread.dylib`_pthread_join + 362
    frame #2: 0x00000001030c40fd rdkafka-hang`thrd_join(thr=0x0000700007130000, res=0x00007ffeecd078b0) at tinycthread.c:692:7
    frame #3: 0x0000000102f896c0 rdkafka-hang`rd_kafka_destroy_app(rk=0x00007fde2580c600, flags=0) at rdkafka.c:1115:13
    frame #4: 0x0000000102f890a7 rdkafka-hang`rd_kafka_destroy(rk=0x00007fde2580c600) at rdkafka.c:1128:9
    frame #5: 0x0000000102f60dc0 rdkafka-hang`_$LT$rdkafka..util..NativePtr$LT$T$GT$$u20$as$u20$core..ops..drop..Drop$GT$::drop::h91a002b5e3ed3107(self=0x00007ffeecd07e20) at util.rs:265:18
    frame #6: 0x0000000102f522c1 rdkafka-hang`core::ptr::drop_in_place$LT$rdkafka..util..NativePtr$LT$rdkafka_sys..bindings..rd_kafka_s$GT$$GT$::h7b0513153c42e4c4((null)=0x00007ffeecd07e20) at mod.rs:188:1
    frame #7: 0x0000000102f51b01 rdkafka-hang`core::ptr::drop_in_place$LT$rdkafka..client..NativeClient$GT$::h5eeab5322a31a161((null)=0x00007ffeecd07e20) at mod.rs:188:1
    frame #8: 0x0000000102f01c45 rdkafka-hang`core::ptr::drop_in_place$LT$rdkafka..client..Client$LT$rdkafka..consumer..DefaultConsumerContext$GT$$GT$::hc9e43582c807fa29((null)=0x00007ffeecd07e20) at mod.rs:188:1
    frame #9: 0x0000000102f01b70 rdkafka-hang`core::ptr::drop_in_place$LT$rdkafka..consumer..base_consumer..BaseConsumer$GT$::h092dcce269646b61((null)=0x00007ffeecd07e20) at mod.rs:188:1
    frame #10: 0x0000000102f007db rdkafka-hang`rdkafka_hang::run_test::h056fe3d7cc698f7e(store_offsets=true) at main.rs:52:1
    frame #11: 0x0000000102f0005a rdkafka-hang`rdkafka_hang::main::h735e3dfb757c8004 at main.rs:11:5

Thread 2, rdk:main, also appears to be waiting for another thread to finish.

* thread #2, name = 'rdk:main'
  * frame #0: 0x00007fff204279ee libsystem_kernel.dylib`__ulock_wait + 10
    frame #1: 0x00007fff2045cf60 libsystem_pthread.dylib`_pthread_join + 362
    frame #2: 0x00000001030c40fd rdkafka-hang`thrd_join(thr=0x0000700007236000, res=0x000070000712fdac) at tinycthread.c:692:7
    frame #3: 0x0000000102f8cb9e rdkafka-hang`rd_kafka_destroy_internal(rk=0x00007fde2580c600) at rdkafka.c:1280:21
    frame #4: 0x0000000102f8c046 rdkafka-hang`rd_kafka_thread_main(arg=0x00007fde2580c600) at rdkafka.c:2085:9
    frame #5: 0x00000001030c40a1 rdkafka-hang`_thrd_wrapper_function(aArg=0x00007fde25406ed0) at tinycthread.c:576:9
    frame #6: 0x00007fff2045b8fc libsystem_pthread.dylib`_pthread_start + 224
    frame #7: 0x00007fff20457443 libsystem_pthread.dylib`thread_start + 15

Thread 3, rdk:broker1, seems to be stuck in this loop in rd_kafka_broker_thread_main:

* thread #3, name = 'rdk:broker1', stop reason = step over
  * frame #0: 0x0000000102fa0a3e rdkafka-hang`rd_kafka_broker_thread_main(arg=0x00007fde25815c00) at rdkafka_broker.c:5268:17
    frame #1: 0x00000001030c40a1 rdkafka-hang`_thrd_wrapper_function(aArg=0x00007fde25407110) at tinycthread.c:576:9
    frame #2: 0x00007fff2045b8fc libsystem_pthread.dylib`_pthread_start + 224
    frame #3: 0x00007fff20457443 libsystem_pthread.dylib`thread_start + 15
gravaton commented 2 years ago

Thanks so much for the detailed report and reproduction steps, going to validate that I can reproduce and then see if I can't untangle what's going on here.

mbellani commented 2 years ago

thanks @gravaton for taking a look, FWIW we also tried setting enable.auto.commit to false and it still freezes. What we're moving forward with atm is not calling store_offsets for now and calling consumer.commit(&topic_partition, CommitMode::Sync) every once in a while to report offsets back to kafka.

look commented 2 years ago

You might be wondering why we are doing this. We aren't using Kafka-tracked consumer group offsets, but we have been storing the offsets in Kafka in order to use some observability tools.

jorendorff commented 2 years ago

@gravaton I wonder if it's just a matter of this call to rd_kafka_commit not being included the rebalance callback here.

jorendorff commented 2 years ago

Update: I don't think that's it. At least, I wrote a C test program that reproduces the issue, just like the Rust version, and adding a call to rd_kafka_commit did not fix the C program.

gravaton commented 2 years ago

So I've been doing a bit of digging here and I've gotten some interesting results. The problem seems to revolve around the (re)usage of the TopicPartitionList object. I first found that if we change this block:

if store_offsets {
    let offset = Offset::Offset(msg.offset());
    topic_partition.set_all_offsets(offset).unwrap();
    consumer.store_offsets(&topic_partition).unwrap();
}

To this:

if store_offsets {
    consumer.store_offset_from_message(&msg).unwrap();
}

Everything seems to work properly. Of course, we're no longer really doing the same operation and, in fact, store_offset_from_message() follows a substantially different codepath internally (it actually ends up hitting rd_kafka_offset_store() which stores offset + 1 as opposed to rd_kafka_offsets_store() which stores offset - subtle but potentially important). This might be a useful workaround depending on your use case however. But after some more investigation I found that if you change it to this as well:

if store_offsets {
    let offset = Offset::Offset(msg.offset());
    let mut current_partitions = topic_partition.clone();
    current_partitions.set_all_offsets(offset).unwrap();
    consumer.store_offsets(&current_partitions).unwrap();
}

it also works just fine! It seems like the problem only manifests if you continuously reuse that first structure. I'm going to keep digging down to understand precisely what's going on here, it's definitely an extra reference being held somehow but finding precisely where is going to be a bit tricky. Hopefully this is a helpful beginning, stay tuned!

jorendorff commented 2 years ago

The problem is inside librdkafka. rd_kafka_topic_partition_ensure_toppar bumps the reference count of a toppar and stores it in a private field inside the partition topic list. This is the extra reference that's causing the problem. The list must then be destroyed before the rd_kafka_t.

I checked that doing drop(topic_partition) just before leaving main() also makes the hang go away.

gravaton commented 2 years ago

Heh I was midway through a larger writeup, but yep that appears to be the issue. librdkafka will cache those toppar references in the partition topic list and then failing to dispose of them in the right order will cause the client destruction to hang since it's done synchronously.

Practically, initializing the TopicPartitionList AFTER the client is created will cause them to be destroyed in the correct order with minimum change to your code, or you can do explicit drops as mentioned.

However in terms of this library I'd say the signature of Consumer::store_offsets is a bit deceptive since it actually is guaranteed to perform a mutating operation instead of a non-mutating one. It might also be good to see if we can find a way to express that running a TopicPartitionlist through Consumer::store_offsets (or anywhere else that might do something similar, I haven't searched exhaustively) causes a lifetime dependency on the Consumer it's going to be getting those references from.

jorendorff commented 2 years ago

Right. It might be worth waiting to see if the librdkafka folks consider this a bug or not. Who knows, they might just fix it :)

I filed an issue upstream: edenhill/librdkafka#3796

jorendorff commented 2 years ago

Well, that was quick. It's not a bug; it's clearly documented here:

All objects except for the handle (C: rd_kafka_t, C++: Consumer,KafkaConsumer,Producer), such as topic objects, messages, topic_partition_t, TopicPartition, events, etc, MUST be destroyed/deleted prior to destroying or closing the handle.

For C, make sure the following objects are destroyed prior to calling rd_kafka_consumer_close() and rd_kafka_destroy():

  • rd_kafka_message_t
  • rd_kafka_topic_t
  • rd_kafka_topic_partition_t
  • rd_kafka_topic_partition_list_t
  • rd_kafka_event_t
  • rd_kafka_queue_t
jorendorff commented 2 years ago

As @edenhill put it:

all objects that have been used with a specific rd_kafka_t instance must be destroyed prior to destroying that rd_kafka_t instance.

"have been used with": Hmm. This rule won't be easy to express exactly using Rust's type and borrow system. It might be best enforced with assertions rather than lifetimes, or by making the TopicPartitionList also hold a reference to the client once they are used together.

It seems to follow that a topic partition list must not be used with more than one rd_kafka_t (and the same for those other objects)—something else to consider asserting...

duarten commented 2 years ago

Maybe TopicPartitionListElem should be #[must_use] and implement Drop (which would call rd_kafka_topic_partition_list_del)?

gravaton commented 2 years ago

Taking a look at BorrowedMessage there might be a reasonably elegant way to do this using PhantomData and tying the lifetimes together but I'm going to have to do some experimenting...