apache / pulsar-client-python

Apache Pulsar Python client library
https://pulsar.apache.org/
Apache License 2.0
50 stars 42 forks source link

Deadlock in concurrent `send_async` calls w/ pulsar-client-3.0 #84

Closed chris-miller-klaviyo closed 1 year ago

chris-miller-klaviyo commented 1 year ago

This script reproduces the deadlock pretty reliably with python 3.7.13, and pulsar-client-3.0 on an x86 Macbook Pro. We originally noticed this in much more complex producer logic in our CI pipeline on linux, but this minimally seems to get into the same deadlocked state.

import pulsar
from time import sleep

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer('persistent://sample/standalone/ns/my-topic')

def send_callback(res, msg):
    print(f"Message '{msg}' published res={res}")

for i in range(30):
    producer.send_async(f"Hello-{i}".encode('utf-8'), callback=send_callback)

# Sleep to allow sends to complete concurrently before closing the connection
sleep(0.5)

client.close()

With an older pulsar-client version 2.10.2, this works as expected:

❯ python deadlock_repro.py
2023-01-24 15:45:26.883 INFO  [0x700009c56000] ExecutorService:41 | Run io_service in a single thread
2023-01-24 15:45:26.883 INFO  [0x10f520600] ClientConnection:189 | [<none> -> pulsar://127.0.0.1:6660] Create ClientConnection, timeout=10000
2023-01-24 15:45:26.883 INFO  [0x10f520600] ConnectionPool:96 | Created connection for pulsar://127.0.0.1:6660
2023-01-24 15:45:26.884 INFO  [0x700009c56000] ClientConnection:375 | [127.0.0.1:50620 -> 127.0.0.1:6660] Connected to broker
2023-01-24 15:45:26.887 INFO  [0x700009d5c000] ExecutorService:41 | Run io_service in a single thread
2023-01-24 15:45:26.887 INFO  [0x700009c56000] HandlerBase:61 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-0, ] Getting connection from pool
2023-01-24 15:45:26.887 INFO  [0x700009c56000] HandlerBase:61 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-1, ] Getting connection from pool
2023-01-24 15:45:26.887 INFO  [0x700009c56000] HandlerBase:61 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-2, ] Getting connection from pool
2023-01-24 15:45:26.887 INFO  [0x700009c56000] HandlerBase:61 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-3, ] Getting connection from pool
2023-01-24 15:45:26.888 INFO  [0x700009c56000] ClientConnection:189 | [<none> -> pulsar://127.0.0.1:6660] Create ClientConnection, timeout=10000
2023-01-24 15:45:26.888 INFO  [0x700009c56000] ConnectionPool:96 | Created connection for pulsar://localhost:6650
2023-01-24 15:45:26.889 INFO  [0x700009c56000] ClientConnection:377 | [127.0.0.1:50621 -> 127.0.0.1:6660] Connected to broker through proxy. Logical broker: pulsar://localhost:6650
2023-01-24 15:45:26.892 INFO  [0x700009c56000] ProducerImpl:174 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-0, ] Created producer on broker [127.0.0.1:50621 -> 127.0.0.1:6660]
2023-01-24 15:45:26.892 INFO  [0x700009c56000] ProducerImpl:174 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-1, ] Created producer on broker [127.0.0.1:50621 -> 127.0.0.1:6660]
2023-01-24 15:45:26.892 INFO  [0x700009c56000] ProducerImpl:174 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-2, ] Created producer on broker [127.0.0.1:50621 -> 127.0.0.1:6660]
2023-01-24 15:45:26.893 INFO  [0x700009c56000] ProducerImpl:174 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-3, ] Created producer on broker [127.0.0.1:50621 -> 127.0.0.1:6660]
Message '(1160,53,1,-1)' published res=Ok
Message '(1160,54,1,-1)' published res=Ok
Message '(1160,55,1,-1)' published res=Ok
Message '(1160,56,1,-1)' published res=Ok
Message '(1160,57,1,-1)' published res=Ok
Message '(1160,58,1,-1)' published res=Ok
Message '(1160,59,1,-1)' published res=Ok
Message '(1160,60,1,-1)' published res=Ok
Message '(1159,56,3,-1)' published res=Ok
Message '(1159,57,3,-1)' published res=Ok
Message '(1159,58,3,-1)' published res=Ok
Message '(1159,59,3,-1)' published res=Ok
Message '(1158,58,0,-1)' published res=Ok
Message '(1158,59,0,-1)' published res=Ok
Message '(1158,60,0,-1)' published res=Ok
Message '(1158,61,0,-1)' published res=Ok
Message '(1158,62,0,-1)' published res=Ok
Message '(1158,63,0,-1)' published res=Ok
Message '(1158,64,0,-1)' published res=Ok
Message '(1158,65,0,-1)' published res=Ok
Message '(1161,53,2,-1)' published res=Ok
Message '(1161,54,2,-1)' published res=Ok
Message '(1161,55,2,-1)' published res=Ok
Message '(1161,56,2,-1)' published res=Ok
Message '(1159,60,3,-1)' published res=Ok
Message '(1161,57,2,-1)' published res=Ok
Message '(1161,58,2,-1)' published res=Ok
Message '(1161,59,2,-1)' published res=Ok
Message '(1159,61,3,-1)' published res=Ok
Message '(1159,62,3,-1)' published res=Ok
2023-01-24 15:45:27.398 INFO  [0x10f520600] ClientImpl:505 | Closing Pulsar client with 1 producers and 0 consumers
2023-01-24 15:45:27.398 INFO  [0x10f520600] ProducerImpl:651 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-0, standalone-0-80] Closing producer for topic persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-0
2023-01-24 15:45:27.398 INFO  [0x10f520600] ProducerImpl:651 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-1, standalone-0-81] Closing producer for topic persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-1
2023-01-24 15:45:27.398 INFO  [0x10f520600] ProducerImpl:651 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-2, standalone-0-82] Closing producer for topic persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-2
2023-01-24 15:45:27.398 INFO  [0x10f520600] ProducerImpl:651 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-3, standalone-0-83] Closing producer for topic persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-3
2023-01-24 15:45:27.400 INFO  [0x700009c56000] ProducerImpl:691 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-0, standalone-0-80] Closed producer
2023-01-24 15:45:27.400 INFO  [0x700009c56000] ProducerImpl:691 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-1, standalone-0-81] Closed producer
2023-01-24 15:45:27.400 INFO  [0x700009c56000] ProducerImpl:691 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-2, standalone-0-82] Closed producer
2023-01-24 15:45:27.401 INFO  [0x700009c56000] ProducerImpl:691 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-3, standalone-0-83] Closed producer
2023-01-24 15:45:27.401 INFO  [0x700009ddf000] ClientConnection:1563 | [127.0.0.1:50620 -> 127.0.0.1:6660] Connection closed
2023-01-24 15:45:27.401 INFO  [0x700009ddf000] ClientConnection:263 | [127.0.0.1:50620 -> 127.0.0.1:6660] Destroyed connection
2023-01-24 15:45:27.401 INFO  [0x700009ddf000] ClientConnection:1563 | [127.0.0.1:50621 -> 127.0.0.1:6660] Connection closed
2023-01-24 15:45:27.401 INFO  [0x700009c56000] ExecutorService:47 | Event loop of ExecutorService exits successfully
2023-01-24 15:45:27.401 INFO  [0x700009d5c000] ExecutorService:47 | Event loop of ExecutorService exits successfully

However, if I upgrade to pulsar-client==3.0, that script gets deadlocked, and does not respond to SIGINT:

❯ python deadlock_repro.py
2023-01-24 15:41:21.192 INFO  [0x10db56600] ClientConnection:189 | [<none> -> pulsar://127.0.0.1:6660] Create ClientConnection, timeout=10000
2023-01-24 15:41:21.192 INFO  [0x10db56600] ConnectionPool:97 | Created connection for pulsar://127.0.0.1:6660
2023-01-24 15:41:21.194 INFO  [0x7000048cd000] ClientConnection:379 | [127.0.0.1:50470 -> 127.0.0.1:6660] Connected to broker
2023-01-24 15:41:21.208 INFO  [0x7000048cd000] HandlerBase:72 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-0, ] Getting connection from pool
2023-01-24 15:41:21.208 INFO  [0x7000048cd000] HandlerBase:72 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-1, ] Getting connection from pool
2023-01-24 15:41:21.208 INFO  [0x7000048cd000] HandlerBase:72 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-2, ] Getting connection from pool
2023-01-24 15:41:21.209 INFO  [0x7000048cd000] HandlerBase:72 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-3, ] Getting connection from pool
2023-01-24 15:41:21.210 INFO  [0x7000048cd000] ClientConnection:189 | [<none> -> pulsar://127.0.0.1:6660] Create ClientConnection, timeout=10000
2023-01-24 15:41:21.210 INFO  [0x7000048cd000] ConnectionPool:97 | Created connection for pulsar://localhost:6650
2023-01-24 15:41:21.211 INFO  [0x7000048cd000] ClientConnection:381 | [127.0.0.1:50471 -> 127.0.0.1:6660] Connected to broker through proxy. Logical broker: pulsar://localhost:6650
2023-01-24 15:41:21.217 INFO  [0x7000048cd000] ProducerImpl:190 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-3, ] Created producer on broker [127.0.0.1:50471 -> 127.0.0.1:6660]
2023-01-24 15:41:21.217 INFO  [0x7000048cd000] ProducerImpl:190 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-0, ] Created producer on broker [127.0.0.1:50471 -> 127.0.0.1:6660]
2023-01-24 15:41:21.217 INFO  [0x7000048cd000] ProducerImpl:190 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-1, ] Created producer on broker [127.0.0.1:50471 -> 127.0.0.1:6660]
2023-01-24 15:41:21.217 INFO  [0x7000048cd000] ProducerImpl:190 | [persistent://chariot1/chariot_ns_sre--heartbeat/chariot_topic_heartbeat-partition-2, ] Created producer on broker [127.0.0.1:50471 -> 127.0.0.1:6660]

We have observed this on x86 Mac laptops, and on linux (in our CI system, testing a much more complex producer than in the script above).

lldb on mac shows the following thread dump of the deadlocked process:

(lldb) thread list
Process 81000 stopped
* thread #1: tid = 0x167b444, 0x00007ff80a193bd2 libsystem_kernel.dylib`__psynch_mutexwait + 10, queue = 'com.apple.main-thread', stop reason = signal SIGSTOP
  thread #2: tid = 0x167b445, 0x00007ff80a1943ea libsystem_kernel.dylib`__psynch_cvwait + 10
  thread #3: tid = 0x167b446, 0x00007ff80a1943ea libsystem_kernel.dylib`__psynch_cvwait + 10
  thread #4: tid = 0x167b447, 0x00007ff80a19634e libsystem_kernel.dylib`kevent + 10
(lldb)
(lldb) bt all
* thread #1, queue = 'com.apple.main-thread', stop reason = signal SIGSTOP
  * frame #0: 0x00007ff80a193bd2 libsystem_kernel.dylib`__psynch_mutexwait + 10
    frame #1: 0x00007ff80a1cbe7e libsystem_pthread.dylib`_pthread_mutex_firstfit_lock_wait + 76
    frame #2: 0x00007ff80a1c9cbb libsystem_pthread.dylib`_pthread_mutex_firstfit_lock_slow + 205
    frame #3: 0x00007ff80a12e739 libc++.1.dylib`std::__1::mutex::lock() + 9
    frame #4: 0x00000001087b7c25 _pulsar.cpython-37m-darwin.so`pulsar::ClientConnection::sendMessage(pulsar::OpSendMsg const&) + 53
    frame #5: 0x00000001088d411a _pulsar.cpython-37m-darwin.so`pulsar::ProducerImpl::sendMessage(pulsar::OpSendMsg const&) + 298
    frame #6: 0x00000001088d2916 _pulsar.cpython-37m-darwin.so`pulsar::ProducerImpl::sendAsyncWithStatsUpdate(pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)> const&) + 3526
    frame #7: 0x00000001088d19bc _pulsar.cpython-37m-darwin.so`pulsar::ProducerImpl::sendAsync(pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>) + 364
    frame #8: 0x00000001088b9a82 _pulsar.cpython-37m-darwin.so`pulsar::PartitionedProducerImpl::sendAsync(pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>) + 914
    frame #9: 0x00000001088c8eb5 _pulsar.cpython-37m-darwin.so`pulsar::Producer::sendAsync(pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>) + 149
    frame #10: 0x0000000108771ac4 _pulsar.cpython-37m-darwin.so`void pybind11::detail::argument_loader<pulsar::Producer*, pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)> >::call_impl<void, pybind11::cpp_function::cpp_function<void, pulsar::Producer, pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>, pybind11::name, pybind11::is_method, pybind11::sibling>(void (pulsar::Producer::*)(pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>), pybind11::name const&, pybind11::is_method const&, pybind11::sibling const&)::'lambda'(pulsar::Producer*, pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>)&, 0ul, 1ul, 2ul, pybind11::detail::void_type>(pulsar::Producer&&, pybind11::detail::index_sequence<0ul, 1ul, 2ul>, pybind11::detail::void_type&&) && + 212
    frame #11: 0x0000000108770f2f _pulsar.cpython-37m-darwin.so`void pybind11::cpp_function::initialize<pybind11::cpp_function::cpp_function<void, pulsar::Producer, pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>, pybind11::name, pybind11::is_method, pybind11::sibling>(void (pulsar::Producer::*)(pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>), pybind11::name const&, pybind11::is_method const&, pybind11::sibling const&)::'lambda'(pulsar::Producer*, pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>), void, pulsar::Producer*, pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>, pybind11::name, pybind11::is_method, pybind11::sibling>(void&&, pulsar::Producer (*)(pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>), pybind11::name const&, pybind11::is_method const&, pybind11::sibling const&)::'lambda'(pybind11::detail::function_call&)::operator()(pybind11::detail::function_call&) const + 255
    frame #12: 0x000000010871420d _pulsar.cpython-37m-darwin.so`pybind11::cpp_function::dispatcher(_object*, _object*, _object*) + 4733
    frame #13: 0x000000010597218a python3.7`_PyMethodDef_RawFastCallKeywords + 714
    frame #14: 0x00000001059711ac python3.7`_PyObject_FastCallKeywords + 332
    frame #15: 0x0000000105a47ca5 python3.7`call_function + 773
    frame #16: 0x0000000105a44428 python3.7`_PyEval_EvalFrameDefault + 28344
    frame #17: 0x0000000105a489a8 python3.7`_PyEval_EvalCodeWithName + 2888
    frame #18: 0x0000000105971444 python3.7`_PyFunction_FastCallKeywords + 228
    frame #19: 0x0000000105a47cac python3.7`call_function + 780
    frame #20: 0x0000000105a44574 python3.7`_PyEval_EvalFrameDefault + 28676
    frame #21: 0x0000000105a489a8 python3.7`_PyEval_EvalCodeWithName + 2888
    frame #22: 0x0000000105a3d4d0 python3.7`PyEval_EvalCode + 48
    frame #23: 0x0000000105a801de python3.7`PyRun_FileExFlags + 174
    frame #24: 0x0000000105a7f77e python3.7`PyRun_SimpleFileExFlags + 270
    frame #25: 0x0000000105aa165e python3.7`pymain_main + 6622
    frame #26: 0x0000000105aa202f python3.7`_Py_UnixMain + 111
    frame #27: 0x000000010dadb52e dyld`start + 462
  thread #2
    frame #0: 0x00007ff80a1943ea libsystem_kernel.dylib`__psynch_cvwait + 10
    frame #1: 0x00007ff80a1cea6f libsystem_pthread.dylib`_pthread_cond_wait + 1249
    frame #2: 0x0000000105a3cb9f python3.7`take_gil + 255
    frame #3: 0x0000000105a3cfb3 python3.7`PyEval_AcquireThread + 19
    frame #4: 0x000000010870be43 _pulsar.cpython-37m-darwin.so`pybind11::gil_scoped_acquire::gil_scoped_acquire() + 83
    frame #5: 0x00000001087714f3 _pulsar.cpython-37m-darwin.so`pybind11::detail::type_caster<std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>, void>::load(pybind11::handle, bool)::func_handle::func_handle(func_handle const&) + 35
    frame #6: 0x00000001087715f1 _pulsar.cpython-37m-darwin.so`std::__1::__function::__func<pybind11::detail::type_caster<std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>, void>::load(pybind11::handle, bool)::func_wrapper, std::__1::allocator<pybind11::detail::type_caster<std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>, void>::load(pybind11::handle, bool)::func_wrapper>, void (pulsar::Result, pulsar::MessageId const&)>::__clone() const + 49
    frame #7: 0x00000001088daccd _pulsar.cpython-37m-darwin.so`std::__1::__function::__func<pulsar::ProducerImpl::sendAsync(pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>)::$_2, std::__1::allocator<pulsar::ProducerImpl::sendAsync(pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>)::$_2>, void (pulsar::Result, pulsar::MessageId const&)>::__clone() const + 93
    frame #8: 0x000000010878f7de _pulsar.cpython-37m-darwin.so`pulsar::OpSendMsg::OpSendMsg(pulsar::OpSendMsg const&) + 126
    frame #9: 0x00000001087f0f85 _pulsar.cpython-37m-darwin.so`boost::any::holder<pulsar::OpSendMsg>::clone() const + 53
    frame #10: 0x00000001087b864a _pulsar.cpython-37m-darwin.so`pulsar::ClientConnection::sendPendingCommands() + 106
    frame #11: 0x00000001087f6812 _pulsar.cpython-37m-darwin.so`boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<std::__1::__bind<void (pulsar::ClientConnection::*)(boost::system::error_code const&), std::__1::shared_ptr<pulsar::ClientConnection>, std::__1::placeholders::__ph<1> const&> > >::operator()(boost::system::error_code, unsigned long, int) + 434
    frame #12: 0x00000001087f6b60 _pulsar.cpython-37m-darwin.so`boost::asio::detail::reactive_socket_send_op<boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul>, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<std::__1::__bind<void (pulsar::ClientConnection::*)(boost::system::error_code const&), std::__1::shared_ptr<pulsar::ClientConnection>, std::__1::placeholders::__ph<1> const&> > >, boost::asio::any_io_executor>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) + 320
    frame #13: 0x00000001087c0946 _pulsar.cpython-37m-darwin.so`boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) + 694
    frame #14: 0x00000001087c0481 _pulsar.cpython-37m-darwin.so`boost::asio::detail::scheduler::run(boost::system::error_code&) + 321
    frame #15: 0x0000000108866ca7 _pulsar.cpython-37m-darwin.so`void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, pulsar::ExecutorService::start()::$_0> >(void*) + 119
    frame #16: 0x00007ff80a1ce4e1 libsystem_pthread.dylib`_pthread_start + 125
    frame #17: 0x00007ff80a1c9f6b libsystem_pthread.dylib`thread_start + 15
  thread #3
    frame #0: 0x00007ff80a1943ea libsystem_kernel.dylib`__psynch_cvwait + 10
    frame #1: 0x00007ff80a1cea6f libsystem_pthread.dylib`_pthread_cond_wait + 1249
    frame #2: 0x00000001087c0873 _pulsar.cpython-37m-darwin.so`boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) + 483
    frame #3: 0x00000001087c0481 _pulsar.cpython-37m-darwin.so`boost::asio::detail::scheduler::run(boost::system::error_code&) + 321
    frame #4: 0x00000001087c0334 _pulsar.cpython-37m-darwin.so`boost::asio::detail::posix_thread::func<boost::asio::detail::resolver_service_base::work_scheduler_runner>::run() + 36
    frame #5: 0x00000001087c02e0 _pulsar.cpython-37m-darwin.so`boost_asio_detail_posix_thread_function + 16
    frame #6: 0x00007ff80a1ce4e1 libsystem_pthread.dylib`_pthread_start + 125
    frame #7: 0x00007ff80a1c9f6b libsystem_pthread.dylib`thread_start + 15
  thread #4
    frame #0: 0x00007ff80a19634e libsystem_kernel.dylib`kevent + 10
    frame #1: 0x00000001087bf587 _pulsar.cpython-37m-darwin.so`boost::asio::detail::kqueue_reactor::run(long, boost::asio::detail::op_queue<boost::asio::detail::scheduler_operation>&) + 327
    frame #2: 0x00000001087c07b4 _pulsar.cpython-37m-darwin.so`boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) + 292
    frame #3: 0x00000001087c0481 _pulsar.cpython-37m-darwin.so`boost::asio::detail::scheduler::run(boost::system::error_code&) + 321
    frame #4: 0x0000000108866ca7 _pulsar.cpython-37m-darwin.so`void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, pulsar::ExecutorService::start()::$_0> >(void*) + 119
    frame #5: 0x00007ff80a1ce4e1 libsystem_pthread.dylib`_pthread_start + 125
    frame #6: 0x00007ff80a1c9f6b libsystem_pthread.dylib`thread_start + 15

I'm not really a pybind/boost expert, but it looks to me like maybe PyBind11 is trying to acquire the GIL in a way that ends up causing deadlocks that didn't occur before PyBind11 was introduced.

Do you have a sense of what that deadlock may be caused by, and how to fix it?

chris-miller-klaviyo commented 1 year ago

I also notice that the example code in the docs for Async producer example results in this same deadlock w/ pulsar-client 3.0 in my testing.

https://pulsar.staged.apache.org/docs/v1.21.0-incubating/clients/Python/#Asyncproducerexample-1zg7id

zbentley commented 1 year ago

This seems like a really fundamental breakage. The c++ code doesn't look GIL-safe at all. One of the main happy paths with this client seems to potentially corrupt interpreter state (just getting a deadlock, as in this issue, seems like a happy coincidence; operating outside of the GIL could break anything, right?)

Would this merit pulling or otherwise hot-fixing the PyPi package?

merlimat commented 1 year ago

@zbentley It's mostly a difference between Boost:Py and PyBind.

The c++ code doesn't look GIL-safe at all.

It's only the sendAsync that is impacted.

One of the main happy paths with this client seems to potentially corrupt interpreter state

The problem is the reverse.. doing stuff with the mutex locked while it should have been unlocked. I cannot see any potential danger to corrupt the interpreter state.

Would this merit pulling or otherwise hot-fixing the PyPi package?

We cannot pull packages from PyPI, though we'll get a release out ASAP.

zbentley commented 1 year ago

@merlimat can you expand on It's only the sendAsync that is impacted?

I also was a little confused as to how GIL handling works in the fix PR: https://github.com/apache/pulsar-client-python/pull/87/files#r1093385236

Unless something is handling grabbing the GIL when the callback is triggered, it seems like that PR may resolve the deadlock at the expense of making the python interactions non-thread-safe.

merlimat commented 1 year ago

@zbentley

can you expand on It's only the sendAsync that is impacted?

In this Python wrapper we mostly expose sync methods, eg: client.create_produce(), etc.. The only async method is producer.send_async().

For sync methods, we internally use the C++ async version (to avoid blocking when checking for Python interruptions) and handle the GIL properly. We have a function for that purpose: https://github.com/apache/pulsar-client-python/blob/main/src/utils.h#L60-L87

Unless something is handling grabbing the GIL when the callback is triggered, it seems like that PR may resolve the deadlock at the expense of making the python interactions non-thread-safe.

You can see in the stack trace above that PyBind is already automatically acquiring the GIL when triggering the Python callback :

thread #2
    frame #0: 0x00007ff80a1943ea libsystem_kernel.dylib`__psynch_cvwait + 10
    frame #1: 0x00007ff80a1cea6f libsystem_pthread.dylib`_pthread_cond_wait + 1249
    frame #2: 0x0000000105a3cb9f python3.7`take_gil + 255
    frame #3: 0x0000000105a3cfb3 python3.7`PyEval_AcquireThread + 19
    frame #4: 0x000000010870be43 _pulsar.cpython-37m-darwin.so`pybind11::gil_scoped_acquire::gil_scoped_acquire() + 83
    frame #5: 0x00000001087714f3 _pulsar.cpython-37m-darwin.so`pybind11::detail::type_caster<std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>, void>::load(pybind11::handle, bool)::func_handle::func_handle(func_handle const&) + 35
    frame #6: 0x00000001087715f1 _pulsar.cpython-37m-darwin.so`std::__1::__function::__func<pybind11::detail::type_caster<std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>, void>::load(pybind11::handle, bool)::func_wrapper, std::__1::allocator<pybind11::detail::type_caster<std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>, void>::load(pybind11::handle, bool)::func_wrapper>, void (pulsar::Result, pulsar::MessageId const&)>::__clone() const + 49
    frame #7: 0x00000001088daccd _pulsar.cpython-37m-darwin.so`std::__1::__function::__func<pulsar::ProducerImpl::sendAsync(pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>)::$_2, std::__1::allocator<pulsar::ProducerImpl::sendAsync(pulsar::Message const&, std::__1::function<void (pulsar::Result, pulsar::MessageId const&)>)::$_2>, void (pulsar::Result, pulsar::MessageId const&)>::__clone() const + 93
    frame #8: 0x000000010878f7de _pulsar.cpython-37m-darwin.so`pulsar::OpSendMsg::OpSendMsg(pulsar::OpSendMsg const&) + 126
zbentley commented 1 year ago

That makes sense @merlimat, thanks. What about "async" (I think it's more "c++ deciding to call Python" rather than the other way around) functionality like the Python logger, or the consumer's on-message callbacks?

merlimat commented 1 year ago

The logger and the message listeners should be fine since they are both going to acquire the GIL when entering the Python callbacks.

This is assuming that after this PR there are no other places that are going to lock on any Pulsar client internal locks while holding the GIL, as was the case with send_async().

zbentley commented 1 year ago

I understand now. Thank you!

merlimat commented 1 year ago

Thanks for the clear repro code and the above info. That made it really easy!

chris-miller-klaviyo commented 1 year ago

Hi, is there a chance that the latest release candidate might be released soon 🙏 we're looking forward to having this bug fixed in an official release! 🎉