apache / pulsar-client-python

Apache Pulsar Python client library
https://pulsar.apache.org/
Apache License 2.0
51 stars 40 forks source link

bad_weak_ptr will be thrown if client.close() is not called after ResultInterrupted is caught #103

Open BewareMyPower opened 1 year ago

BewareMyPower commented 1 year ago

Reproduce:

import pulsar

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', "my-sub")

while True:
    try:
        msg = consumer.receive()
        consumer.acknowledge(msg)
    except Exception as e:
        print("Exception: {}".format(type(e)))
        break

Press Ctrl+C and you will see:

^CException: <class '_pulsar.Interrupted'>
2023-03-09 22:32:39.870 WARN  [139780853032768] ConsumerImpl:126 | [persistent://public/default/my-topic, my-sub, 0] Destroyed consumer which was not properly closed
2023-03-09 22:32:39.870 INFO  [139780853032768] ConsumerImpl:134 | [persistent://public/default/my-topic, my-sub, 0] Closed consumer for race condition: 0
terminate called after throwing an instance of 'std::bad_weak_ptr'
  what():  bad_weak_ptr
Aborted

After enabling the debug level logs, you will see:

2023-03-09 22:33:41.191 DEBUG [140614849333056] ConsumerImpl:120 | [persistent://public/default/my-topic, my-sub, 0] ~ConsumerImpl
2023-03-09 22:33:41.191 WARN  [140614849333056] ConsumerImpl:126 | [persistent://public/default/my-topic, my-sub, 0] Destroyed consumer which was not properly closed
2023-03-09 22:33:41.191 INFO  [140614849333056] ConsumerImpl:134 | [persistent://public/default/my-topic, my-sub, 0] Closed consumer for race condition: 0
2023-03-09 22:33:41.191 DEBUG [140614849333056] AckGroupingTrackerEnabled:111 | Reference to the HandlerBase is not valid.
terminate called after throwing an instance of 'std::bad_weak_ptr'
  what():  bad_weak_ptr
2023-03-09 22:33:41.193 DEBUG [140614821328640] ClientConnection:814 | [127.0.0.1:39724 -> 127.0.0.1:6650] Handling incoming command: SUCCESS
2023-03-09 22:33:41.193 DEBUG [140614821328640] ClientConnection:904 | [127.0.0.1:39724 -> 127.0.0.1:6650] Received success response from server. req_id: 1
Aborted

The stack:

#7  0x00007ffff68be5aa in _Unwind_Resume () from /lib/x86_64-linux-gnu/libgcc_s.so.1
#8  0x00007ffff6ba7a99 in pulsar::ConsumerImpl::shutdown() [clone .cold] ()
   from /usr/local/lib/python3.8/dist-packages/pulsar_client.libs/libpulsar-9425ac2e.so
#9  0x00007ffff6c98d74 in pulsar::ConsumerImpl::~ConsumerImpl() () from /usr/local/lib/python3.8/dist-packages/pulsar_client.libs/libpulsar-9425ac2e.so
#10 0x00007ffff7524bfa in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() ()
   from /usr/local/lib/python3.8/dist-packages/_pulsar.cpython-38-x86_64-linux-gnu.so
#11 0x00007ffff755b28c in pybind11::class_<pulsar::Consumer>::dealloc(pybind11::detail::value_and_holder&) ()
   from /usr/local/lib/python3.8/dist-packages/_pulsar.cpython-38-x86_64-linux-gnu.so
#12 0x00007ffff753240e in pybind11::detail::clear_instance(_object*) () from /usr/local/lib/python3.8/dist-packages/_pulsar.cpython-38-x86_64-linux-gnu.so
#13 0x00007ffff753305f in pybind11_object_dealloc () from /usr/local/lib/python3.8/dist-packages/_pulsar.cpython-38-x86_64-linux-gnu.so
BewareMyPower commented 1 year ago

Summary

This issue is hard to avoid because it happens when the destructor of a std::shared_ptr is called from the libpulsar.so while libpulsar.so is already unloaded by the Python interpreter.

Analysis

We can see the crash happened in ConsumerImpl::shutdown:

#10 std::shared_ptr<pulsar::ClientImpl>::~shared_ptr (this=<optimized out>, __in_chrg=<optimized out>) at /usr/include/c++/9/bits/shared_ptr.h:103
#11 pulsar::ConsumerImpl::shutdown (this=<optimized out>) at /home/xyz/github.com/apache/pulsar-client-cpp/lib/ConsumerImpl.cc:1267
#12 0x00007ffff7123a5e in pulsar::ConsumerImpl::~ConsumerImpl (this=0x7ffff0003250, __in_chrg=<optimized out>)
    at /home/xyz/github.com/apache/pulsar-client-cpp/lib/ConsumerImpl.cc:164

https://github.com/apache/pulsar-client-cpp/blob/1dad87bb3b804d2aa8542ac48e4c35228ac2f1bf/lib/ConsumerImpl.cc#L1267

    auto client = client_.lock();

Even if I commented the client_.lock() related code, it now happened when constructing a Message object in shutdown:

#10 std::shared_ptr<pulsar::MessageImpl>::~shared_ptr (this=<optimized out>, __in_chrg=<optimized out>) at /usr/include/c++/9/bits/shared_ptr.h:103
#11 pulsar::Message::~Message (this=<optimized out>, __in_chrg=<optimized out>) at /home/xyz/github.com/apache/pulsar-client-cpp/include/pulsar/Message.h:43
#12 pulsar::ConsumerImpl::failPendingReceiveCallback (this=<optimized out>) at /home/xyz/github.com/apache/pulsar-client-cpp/lib/ConsumerImpl.cc:622
#13 0x00007ffff7122cd0 in pulsar::ConsumerImpl::shutdown (this=0x7ffff0003250) at /home/xyz/github.com/apache/pulsar-client-cpp/lib/ConsumerImpl.cc:1274

https://github.com/apache/pulsar-client-cpp/blob/1dad87bb3b804d2aa8542ac48e4c35228ac2f1bf/lib/ConsumerImpl.cc#L622

    Message msg;