apache / pulsar-client-python

Apache Pulsar Python client library
https://pulsar.apache.org/
Apache License 2.0
51 stars 39 forks source link

Deadlock in python client when using send_async with flush #122

Closed jvstein closed 1 year ago

jvstein commented 1 year ago

Describe the bug

The example code in the python client's send_async method does not work for processes that end.

The example needs a producer.flush() call, but adding it introduces a deadlock.

To Reproduce

Run this modified version of the example code against a pulsar cluster. Note changes to reduce output text, change from while True to for i in range(500000), and introduce the producer.flush() call. The flush call is necessary to correctly produce all messages. If omitted, some messages will be skipped.

import itertools
import pulsar

client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
    # Topic must have at least 2 partitions.
    #   bin/pulsar-admin topics create-partitioned-topic persistent://public/default/python-client-122 --partitions 2
    topic='persistent://public/default/python-client-122',
    block_if_queue_full=True,
    batching_enabled=True,
    batching_max_publish_delay_ms=10
)

counter = itertools.count(start=1)
def callback(res, msg_id):
    value = next(counter)
    if value % 100 == 0:
        print(f'Published {value} records')

for i in range(50000):
    producer.send_async(('Hello-%d' % i).encode('utf-8'), callback)

producer.flush()
client.close()

Desktop (please complete the following information):

pulsar-client==3.1.0 Python 3.9.13 Ubuntu 22.04.2 LTS Pulsar 2.10.2 running in a Kubernetes cluster; client connected via pulsar-proxy running as a LoadBalancer Service

Additional context / Fix

Adding a time.sleep(1) before the producer.flush() call allows the process to exit cleanly every time that I've tested. This seems to suggest that the deadlock occurs somewhere between trying to clear the active batch and trying to flush the producer.

Possibly related: https://github.com/apache/pulsar/issues/5666

merlimat commented 1 year ago

@jvstein This looks the same as #84 which was fixed by #87 and released in the 3.1.0 release.

There's probably something in your setup that is still using the 3.0.0 release? I just did a quick test with the above code and it works in 3.1.0 and fails as expected in 3.0.0.

jvstein commented 1 year ago

Thanks for linking #84. I had read that previously but couldn't find it anymore.

I'm definitely using pulsar-client==3.1.0, which is why I thought #87 might not be a full solution.

$ pip freeze
apache-bookkeeper-client==4.16.1
certifi==2023.5.7
fastavro==0.24.0
grpcio==1.27.2
prometheus-client==0.16.0
protobuf==3.20.3
pulsar-client==3.1.0
pymmh3==0.0.5
pytz==2023.3
ratelimit==2.2.1
six==1.16.0

I just spun up a local standalone pulsar instance in docker and I'm not able to reproduce against that instance. I can still reproduce against my Kubernetes deployment (3 zookeeper, 4 bookies, 3 brokers, 1 recovery, 3 proxy) based on the helm chart (which I now see is pulsar 2.10.2, not 3.0.0).

merlimat commented 1 year ago

Can you get a gdb stack trace when the process is deadlocked?

jvstein commented 1 year ago

Sure.

bt output ``` #0 futex_wait (private=0, expected=2, futex_word=0x7ffff0002eb0) at ../sysdeps/nptl/futex-internal.h:146 #1 __GI___lll_lock_wait (futex=futex@entry=0x7ffff0002eb0, private=0) at ./nptl/lowlevellock.c:49 #2 0x00007ffff7d20082 in lll_mutex_lock_optimized (mutex=0x7ffff0002eb0) at ./nptl/pthread_mutex_lock.c:48 #3 ___pthread_mutex_lock (mutex=0x7ffff0002eb0) at ./nptl/pthread_mutex_lock.c:93 #4 0x00007ffff692be05 in pulsar::ProducerImpl::flushAsync(std::function) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #5 0x00007ffff6915e03 in pulsar::PartitionedProducerImpl::flushAsync(std::function) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #6 0x00007ffff6924aaf in pulsar::Producer::flushAsync(std::function) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #7 0x00007ffff71970b8 in std::_Function_handler), Producer_flush(pulsar::Producer&)::{lambda(std::function)#1}>::_M_invoke(std::_Any_data const&, std::function&&) () from /tmp/.venv/lib/python3.9/site-packages/_pulsar.cpython-39-x86_64-linux-gnu.so #8 0x00007ffff71a3a19 in waitForAsyncResult(std::function)>) () from /tmp/.venv/lib/python3.9/site-packages/_pulsar.cpython-39-x86_64-linux-gnu.so #9 0x00007ffff71968f7 in Producer_flush(pulsar::Producer&) () from /tmp/.venv/lib/python3.9/site-packages/_pulsar.cpython-39-x86_64-linux-gnu.so #10 0x00007ffff7199d83 in pybind11::cpp_function::initialize(void (*&)(pulsar::Producer&), void (*)(pulsar::Producer&), pybind11::name const&, pybind11::is_method const&, pybind11::sibling const&, char const (&) [108])::{lambda(pybind11::detail::function_call&)#3}::_FUN(pybind11::detail::function_call&) () from /tmp/.venv/lib/python3.9/site-packages/_pulsar.cpython-39-x86_64-linux-gnu.so #11 0x00007ffff71459e7 in pybind11::cpp_function::dispatcher(_object*, _object*, _object*) () from /tmp/.venv/lib/python3.9/site-packages/_pulsar.cpython-39-x86_64-linux-gnu.so #12 0x0000555555776563 in cfunction_call (func=0x7ffff6419090, args=, kwargs=) at Objects/methodobject.c:543 #13 0x00005555555c7bdc in _PyObject_MakeTpCall (tstate=0x5555559247f0, callable=0x7ffff6419090, args=, nargs=, keywords=0x0) at Objects/call.c:191 #14 0x00005555557617c2 in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=1, args=0x7ffff7220b78, callable=0x7ffff6419090, tstate=0x5555559247f0) at ./Include/cpython/abstract.h:116 #15 _PyObject_VectorcallTstate (kwnames=0x0, nargsf=1, args=0x7ffff7220b78, callable=0x7ffff6419090, tstate=0x5555559247f0) at ./Include/cpython/abstract.h:103 #16 method_vectorcall (method=, args=0x7ffff7220b80, nargsf=, kwnames=0x0) at Objects/classobject.c:53 #17 0x00005555555b7af8 in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=, args=0x7ffff7220b80, callable=0x7ffff64204c0, tstate=0x5555559247f0) at ./Include/cpython/abstract.h:118 #18 PyObject_Vectorcall (kwnames=0x0, nargsf=, args=0x7ffff7220b80, callable=0x7ffff64204c0) at ./Include/cpython/abstract.h:127 #19 call_function (kwnames=0x0, oparg=, pp_stack=, tstate=) at Python/ceval.c:5077 #20 _PyEval_EvalFrameDefault (tstate=, f=, throwflag=) at Python/ceval.c:3489 #21 0x00005555555aff1b in _PyEval_EvalFrame (throwflag=0, f=0x7ffff7220a00, tstate=0x5555559247f0) at ./Include/internal/pycore_ceval.h:40 #22 function_code_fastcall (tstate=0x5555559247f0, co=, args=, nargs=1, globals=) at Objects/call.c:330 #23 0x00005555555b84a2 in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=, args=0x5555559a3680, callable=0x7ffff6226af0, tstate=0x5555559247f0) at ./Include/cpython/abstract.h:118 #24 PyObject_Vectorcall (kwnames=0x0, nargsf=, args=0x5555559a3680, callable=0x7ffff6226af0) at ./Include/cpython/abstract.h:127 #25 call_function (kwnames=0x0, oparg=, pp_stack=, tstate=) at Python/ceval.c:5077 #26 _PyEval_EvalFrameDefault (tstate=, f=, throwflag=) at Python/ceval.c:3506 #27 0x0000555555678ad4 in _PyEval_EvalFrame (throwflag=0, f=0x5555559a34c0, tstate=0x5555559247f0) at ./Include/internal/pycore_ceval.h:40 #28 _PyEval_EvalCode (tstate=tstate@entry=0x5555559247f0, _co=_co@entry=0x7ffff75d93a0, globals=, locals=locals@entry=0x0, args=, argcount=0, kwnames=0x0, kwargs=0x5555559777f0, kwcount=0, kwstep=1, defs=0x0, defcount=0, kwdefs=0x0, closure=0x0, name=0x7ffff75d7c70, qualname=0x7ffff75d7c70) at Python/ceval.c:4329 #29 0x00005555555c8dce in _PyFunction_Vectorcall (func=, stack=, nargsf=, kwnames=) at Objects/call.c:396 #30 0x00005555555b6955 in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=, args=0x5555559777f0, callable=0x7ffff7687160, tstate=0x5555559247f0) at ./Include/cpython/abstract.h:118 #31 PyObject_Vectorcall (kwnames=0x0, nargsf=, args=0x5555559777f0, callable=0x7ffff7687160) at ./Include/cpython/abstract.h:127 #32 call_function (kwnames=0x0, oparg=, pp_stack=, tstate=) at Python/ceval.c:5077 #33 _PyEval_EvalFrameDefault (tstate=, f=, throwflag=) at Python/ceval.c:3520 #34 0x0000555555678ad4 in _PyEval_EvalFrame (throwflag=0, f=0x555555977680, tstate=0x5555559247f0) at ./Include/internal/pycore_ceval.h:40 #35 _PyEval_EvalCode (tstate=0x5555559247f0, _co=_co@entry=0x7ffff75e10e0, globals=globals@entry=0x7ffff7645a40, locals=locals@entry=0x7ffff7645a40, args=args@entry=0x0, argcount=argcount@entry=0, kwnames=0x0, kwargs=0x0, kwcount=0, kwstep=2, defs=0x0, defcount=0, kwdefs=0x0, closure=0x0, name=0x0, qualname=0x0) at Python/ceval.c:4329 #36 0x0000555555678dfa in _PyEval_EvalCodeWithName (qualname=0x0, name=0x0, closure=0x0, kwdefs=0x0, defcount=0, defs=0x0, kwstep=2, kwcount=0, kwargs=0x0, kwnames=0x0, argcount=0, args=0x0, locals=0x7ffff7645a40, globals=0x7ffff7645a40, _co=0x7ffff75e10e0) at Python/ceval.c:4361 #37 PyEval_EvalCodeEx (closure=0x0, kwdefs=0x0, defcount=0, defs=0x0, kwcount=0, kws=0x0, argcount=0, args=0x0, locals=0x7ffff7645a40, globals=0x7ffff7645a40, _co=0x7ffff75e10e0) at Python/ceval.c:4377 #38 PyEval_EvalCode (co=co@entry=0x7ffff75e10e0, globals=globals@entry=0x7ffff7645a40, locals=locals@entry=0x7ffff7645a40) at Python/ceval.c:828 #39 0x00005555556b908c in run_eval_code_obj (locals=0x7ffff7645a40, globals=0x7ffff7645a40, co=0x7ffff75e10e0, tstate=0x5555559247f0) at Python/pythonrun.c:1221 #40 run_mod (mod=, filename=filename@entry=0x7ffff76b9570, globals=globals@entry=0x7ffff7645a40, locals=locals@entry=0x7ffff7645a40, flags=flags@entry=0x7fffffffc868, arena=arena@entry=0x7ffff7693910) at Python/pythonrun.c:1242 #41 0x00005555556bb0d0 in pyrun_file (flags=0x7fffffffc868, closeit=, locals=0x7ffff7645a40, globals=0x7ffff7645a40, start=257, filename=0x7ffff76b9570, fp=0x5555559251e0) at Python/pythonrun.c:1140 #42 pyrun_simple_file (flags=0x7fffffffc868, closeit=, filename=0x7ffff76b9570, fp=0x5555559251e0) at Python/pythonrun.c:450 #43 PyRun_SimpleFileExFlags (fp=fp@entry=0x5555559251e0, filename=, closeit=closeit@entry=1, flags=flags@entry=0x7fffffffc868) at Python/pythonrun.c:483 #44 0x00005555556bb6cc in PyRun_AnyFileExFlags (fp=fp@entry=0x5555559251e0, filename=, closeit=closeit@entry=1, flags=flags@entry=0x7fffffffc868) at Python/pythonrun.c:92 #45 0x00005555555babef in pymain_run_file (cf=0x7fffffffc868, config=0x555555925ef0) at Modules/main.c:373 #46 pymain_run_python (exitcode=exitcode@entry=0x7fffffffc990) at Modules/main.c:598 #47 0x00005555555bb290 in Py_RunMain () at Modules/main.c:677 #48 pymain_main (args=0x7fffffffc950) at Modules/main.c:707 #49 Py_BytesMain (argc=, argv=) at Modules/main.c:731 #50 0x00007ffff7cb1d90 in __libc_start_call_main (main=main@entry=0x5555555afeb0
, argc=argc@entry=2, argv=argv@entry=0x7fffffffcad8) at ../sysdeps/nptl/libc_start_call_main.h:58 #51 0x00007ffff7cb1e40 in __libc_start_main_impl (main=0x5555555afeb0
, argc=2, argv=0x7fffffffcad8, init=, fini=, rtld_fini=, stack_end=0x7fffffffcac8) at ../csu/libc-start.c:392 #52 0x00005555555b9e75 in _start () ```
info threads output ``` Id Target Id Frame * 1 Thread 0x7ffff7c85740 (LWP 1062252) "python" futex_wait (private=0, expected=2, futex_word=0x7ffff0002eb0) at ../sysdeps/nptl/futex-internal.h:146 2 Thread 0x7ffff612d640 (LWP 1062253) "python" __futex_abstimed_wait_common64 (private=1, cancel=true, abstime=0x7ffff612c120, op=137, expected=0, futex_word=0x555555919788 <_PyRuntime+424>) at ./nptl/futex-internal.c:57 3 Thread 0x7ffff592c640 (LWP 1062254) "python" __futex_abstimed_wait_common64 (private=0, cancel=true, abstime=0x0, op=393, expected=0, futex_word=0x555555b0b6ac) at ./nptl/futex-internal.c:57 4 Thread 0x7ffff5101640 (LWP 1062255) "python" 0x00007ffff7dadfde in epoll_wait (epfd=8, events=0x7ffff51004f0, maxevents=128, timeout=-1) at ../sysdeps/unix/sysv/linux/epoll_wait.c:30 ```
merlimat commented 1 year ago

Can you also get the bt all to see all 4 threads?

jvstein commented 1 year ago

Attached. For what it's worth, this is the stack from my real code that's deadlocking not the exact example above. The fact that I'm using an avro schema might be relevant as there would likely be some additional communication with the schema registry. The schema itself isn't that complicated (five fields, no significant nesting) and has been used before on the cluster.

schema_file = "/path/to/my.avsc"
schema_definition = load_schema(schema_file)
schema = AvroSchema(None, schema_definition=schema_definition)
client = pulsar.Client('pulsar://my-pulsar-host:6650')

producer = client.create_producer(
    topic="persistent://tenant/namespace/topic",
    schema=schema,
    block_if_queue_full=True,
    batching_enabled=True,
    batching_max_publish_delay_ms=10
)
thread apply all bt output ``` Thread 4 (Thread 0x7ffff48ad640 (LWP 1067857) "python"): #0 0x00007ffff7dadfde in epoll_wait (epfd=8, events=0x7ffff48ac4f0, maxevents=128, timeout=-1) at ../sysdeps/unix/sysv/linux/epoll_wait.c:30 #1 0x00007ffff682dad5 in boost::asio::detail::epoll_reactor::run(long, boost::asio::detail::op_queue&) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #2 0x00007ffff6836443 in boost::asio::detail::scheduler::run(boost::system::error_code&) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #3 0x00007ffff68c325b in pulsar::ExecutorService::start()::{lambda()#1}::operator()() const () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #4 0x00007ffff6e4b650 in execute_native_thread_routine () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #5 0x00007ffff7d1cb43 in start_thread (arg=) at ./nptl/pthread_create.c:442 #6 0x00007ffff7daea00 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81 Thread 3 (Thread 0x7ffff50ae640 (LWP 1067856) "python"): #0 __futex_abstimed_wait_common64 (private=0, cancel=true, abstime=0x0, op=393, expected=0, futex_word=0x555555bd163c) at ./nptl/futex-internal.c:57 #1 __futex_abstimed_wait_common (cancel=true, private=0, abstime=0x0, clockid=0, expected=0, futex_word=0x555555bd163c) at ./nptl/futex-internal.c:87 #2 __GI___futex_abstimed_wait_cancelable64 (futex_word=futex_word@entry=0x555555bd163c, expected=expected@entry=0, clockid=clockid@entry=0, abstime=abstime@entry=0x0, private=private@entry=0) at ./nptl/futex-internal.c:139 #3 0x00007ffff7d1bac1 in __pthread_cond_wait_common (abstime=0x0, clockid=0, mutex=0x555555bd15d8, cond=0x555555bd1610) at ./nptl/pthread_cond_wait.c:503 #4 ___pthread_cond_wait (cond=0x555555bd1610, mutex=0x555555bd15d8) at ./nptl/pthread_cond_wait.c:627 #5 0x00007ffff68365bc in boost::asio::detail::scheduler::run(boost::system::error_code&) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #6 0x00007ffff6836b3a in boost::asio::detail::posix_thread::func::run() () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #7 0x00007ffff682cdf0 in boost_asio_detail_posix_thread_function () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #8 0x00007ffff7d1cb43 in start_thread (arg=) at ./nptl/pthread_create.c:442 #9 0x00007ffff7daea00 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81 Thread 2 (Thread 0x7ffff58af640 (LWP 1067855) "python"): #0 __futex_abstimed_wait_common64 (private=1, cancel=true, abstime=0x7ffff58ae120, op=137, expected=0, futex_word=0x555555919788 <_PyRuntime+424>) at ./nptl/futex-internal.c:57 #1 __futex_abstimed_wait_common (cancel=true, private=1, abstime=0x7ffff58ae120, clockid=1435604832, expected=0, futex_word=0x555555919788 <_PyRuntime+424>) at ./nptl/futex-internal.c:87 #2 __GI___futex_abstimed_wait_cancelable64 (futex_word=futex_word@entry=0x555555919788 <_PyRuntime+424>, expected=expected@entry=0, clockid=clockid@entry=1, abstime=abstime@entry=0x7ffff58ae120, private=private@entry=0) at ./nptl/futex-internal.c:139 #3 0x00007ffff7d1bf1b in __pthread_cond_wait_common (abstime=0x7ffff58ae120, clockid=1, mutex=0x555555919790 <_PyRuntime+432>, cond=0x555555919760 <_PyRuntime+384>) at ./nptl/pthread_cond_wait.c:503 #4 ___pthread_cond_timedwait64 (cond=cond@entry=0x555555919760 <_PyRuntime+384>, mutex=mutex@entry=0x555555919790 <_PyRuntime+432>, abstime=abstime@entry=0x7ffff58ae120) at ./nptl/pthread_cond_wait.c:652 #5 0x000055555567704f in PyCOND_TIMEDWAIT (us=, mut=0x555555919790 <_PyRuntime+432>, cond=0x555555919760 <_PyRuntime+384>) at Python/condvar.h:73 #6 take_gil (tstate=tstate@entry=0x7ffff002b290) at Python/ceval_gil.h:247 #7 0x0000555555677a12 in PyEval_AcquireThread (tstate=0x7ffff002b290) at Python/ceval.c:385 #8 0x00007ffff713f5d5 in pybind11::gil_scoped_acquire::gil_scoped_acquire() () from /tmp/.venv/lib/python3.9/site-packages/_pulsar.cpython-39-x86_64-linux-gnu.so #9 0x00007ffff7199cbc in std::_Function_handler, void>::load(pybind11::handle, bool)::func_wrapper>::_M_manager(std::_Any_data&, std::_Any_data const&, std::_Manager_operation) () from /tmp/.venv/lib/python3.9/site-packages/_pulsar.cpython-39-x86_64-linux-gnu.so #10 0x00007ffff6934cb2 in std::function::function(std::function const&) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #11 0x00007ffff692a5cc in std::_Function_handler)::{lambda(pulsar::Result, pulsar::MessageId const&)#1}>::_M_manager(std::_Any_data&, std::_Any_data const&, std::_Manager_operation) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #12 0x00007ffff68de83f in pulsar::MessageAndCallbackBatch::createSendCallback() const () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #13 0x00007ffff67ff43d in pulsar::BatchMessageContainerBase::createOpSendMsgHelper(pulsar::OpSendMsg&, std::function const&, pulsar::MessageAndCallbackBatch const&) const () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #14 0x00007ffff680075c in pulsar::BatchMessageContainerBase::processAndClear(std::function, std::function) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #15 0x00007ffff692ab88 in pulsar::ProducerImpl::batchMessageAndSend(std::function const&) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #16 0x00007ffff692b215 in pulsar::ProducerImpl::sendAsyncWithStatsUpdate(pulsar::Message const&, std::function const&)::{lambda(boost::system::error_code const&)#2}::operator()(boost::system::error_code const&) const () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #17 0x00007ffff692bb85 in boost::asio::detail::wait_handler const&)::{lambda(boost::system::error_code const&)#2}, boost::asio::any_io_executor>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #18 0x00007ffff68367a2 in boost::asio::detail::scheduler::run(boost::system::error_code&) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #19 0x00007ffff68c325b in pulsar::ExecutorService::start()::{lambda()#1}::operator()() const () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #20 0x00007ffff6e4b650 in execute_native_thread_routine () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #21 0x00007ffff7d1cb43 in start_thread (arg=) at ./nptl/pthread_create.c:442 #22 0x00007ffff7daea00 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81 Thread 1 (Thread 0x7ffff7c85740 (LWP 1067852) "python"): #0 futex_wait (private=0, expected=2, futex_word=0x7ffff0002eb0) at ../sysdeps/nptl/futex-internal.h:146 #1 __GI___lll_lock_wait (futex=futex@entry=0x7ffff0002eb0, private=0) at ./nptl/lowlevellock.c:49 #2 0x00007ffff7d20082 in lll_mutex_lock_optimized (mutex=0x7ffff0002eb0) at ./nptl/pthread_mutex_lock.c:48 #3 ___pthread_mutex_lock (mutex=0x7ffff0002eb0) at ./nptl/pthread_mutex_lock.c:93 #4 0x00007ffff692be05 in pulsar::ProducerImpl::flushAsync(std::function) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #5 0x00007ffff6915e03 in pulsar::PartitionedProducerImpl::flushAsync(std::function) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #6 0x00007ffff6924aaf in pulsar::Producer::flushAsync(std::function) () from /tmp/.venv/lib/python3.9/site-packages/pulsar_client.libs/libpulsar-df34d1a6.so #7 0x00007ffff71970b8 in std::_Function_handler), Producer_flush(pulsar::Producer&)::{lambda(std::function)#1}>::_M_invoke(std::_Any_data const&, std::function&&) () from /tmp/.venv/lib/python3.9/site-packages/_pulsar.cpython-39-x86_64-linux-gnu.so #8 0x00007ffff71a3a19 in waitForAsyncResult(std::function)>) () from /tmp/.venv/lib/python3.9/site-packages/_pulsar.cpython-39-x86_64-linux-gnu.so #9 0x00007ffff71968f7 in Producer_flush(pulsar::Producer&) () from /tmp/.venv/lib/python3.9/site-packages/_pulsar.cpython-39-x86_64-linux-gnu.so #10 0x00007ffff7199d83 in pybind11::cpp_function::initialize(void (*&)(pulsar::Producer&), void (*)(pulsar::Producer&), pybind11::name const&, pybind11::is_method const&, pybind11::sibling const&, char const (&) [108])::{lambda(pybind11::detail::function_call&)#3}::_FUN(pybind11::detail::function_call&) () from /tmp/.venv/lib/python3.9/site-packages/_pulsar.cpython-39-x86_64-linux-gnu.so #11 0x00007ffff71459e7 in pybind11::cpp_function::dispatcher(_object*, _object*, _object*) () from /tmp/.venv/lib/python3.9/site-packages/_pulsar.cpython-39-x86_64-linux-gnu.so #12 0x0000555555776563 in cfunction_call (func=0x7ffff64190e0, args=, kwargs=) at Objects/methodobject.c:543 #13 0x00005555555c7bdc in _PyObject_MakeTpCall (tstate=0x5555559247f0, callable=0x7ffff64190e0, args=, nargs=, keywords=0x0) at Objects/call.c:191 #14 0x00005555557617c2 in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=1, args=0x7ffff58b39d8, callable=0x7ffff64190e0, tstate=0x5555559247f0) at ./Include/cpython/abstract.h:116 #15 _PyObject_VectorcallTstate (kwnames=0x0, nargsf=1, args=0x7ffff58b39d8, callable=0x7ffff64190e0, tstate=0x5555559247f0) at ./Include/cpython/abstract.h:103 #16 method_vectorcall (method=, args=0x7ffff58b39e0, nargsf=, kwnames=0x0) at Objects/classobject.c:53 #17 0x00005555555b7af8 in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=, args=0x7ffff58b39e0, callable=0x7ffff5914a00, tstate=0x5555559247f0) at ./Include/cpython/abstract.h:118 #18 PyObject_Vectorcall (kwnames=0x0, nargsf=, args=0x7ffff58b39e0, callable=0x7ffff5914a00) at ./Include/cpython/abstract.h:127 #19 call_function (kwnames=0x0, oparg=, pp_stack=, tstate=) at Python/ceval.c:5077 #20 _PyEval_EvalFrameDefault (tstate=, f=, throwflag=) at Python/ceval.c:3489 #21 0x00005555555aff1b in _PyEval_EvalFrame (throwflag=0, f=0x7ffff58b3860, tstate=0x5555559247f0) at ./Include/internal/pycore_ceval.h:40 #22 function_code_fastcall (tstate=0x5555559247f0, co=, args=, nargs=1, globals=) at Objects/call.c:330 #23 0x00005555555b84a2 in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=, args=0x7ffff5915bf8, callable=0x7ffff6227b80, tstate=0x5555559247f0) at ./Include/cpython/abstract.h:118 #24 PyObject_Vectorcall (kwnames=0x0, nargsf=, args=0x7ffff5915bf8, callable=0x7ffff6227b80) at ./Include/cpython/abstract.h:127 #25 call_function (kwnames=0x0, oparg=, pp_stack=, tstate=) at Python/ceval.c:5077 #26 _PyEval_EvalFrameDefault (tstate=, f=, throwflag=) at Python/ceval.c:3506 #27 0x0000555555678ad4 in _PyEval_EvalFrame (throwflag=0, f=0x7ffff5915a40, tstate=0x5555559247f0) at ./Include/internal/pycore_ceval.h:40 #28 _PyEval_EvalCode (tstate=tstate@entry=0x5555559247f0, _co=_co@entry=0x7ffff75d93a0, globals=, locals=locals@entry=0x0, args=, argcount=0, kwnames=0x0, kwargs=0x555555977920, kwcount=0, kwstep=1, defs=0x0, defcount=0, kwdefs=0x0, closure=0x0, name=0x7ffff756fdb0, qualname=0x7ffff756fdb0) at Python/ceval.c:4329 #29 0x00005555555c8dce in _PyFunction_Vectorcall (func=, stack=, nargsf=, kwnames=) at Objects/call.c:396 #30 0x00005555555b6955 in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=, args=0x555555977920, callable=0x7ffff7687160, tstate=0x5555559247f0) at ./Include/cpython/abstract.h:118 #31 PyObject_Vectorcall (kwnames=0x0, nargsf=, args=0x555555977920, callable=0x7ffff7687160) at ./Include/cpython/abstract.h:127 #32 call_function (kwnames=0x0, oparg=, pp_stack=, tstate=) at Python/ceval.c:5077 #33 _PyEval_EvalFrameDefault (tstate=, f=, throwflag=) at Python/ceval.c:3520 #34 0x0000555555678ad4 in _PyEval_EvalFrame (throwflag=0, f=0x5555559777b0, tstate=0x5555559247f0) at ./Include/internal/pycore_ceval.h:40 #35 _PyEval_EvalCode (tstate=0x5555559247f0, _co=_co@entry=0x7ffff75e10e0, globals=globals@entry=0x7ffff7644a80, locals=locals@entry=0x7ffff7644a80, args=args@entry=0x0, argcount=argcount@entry=0, kwnames=0x0, kwargs=0x0, kwcount=0, kwstep=2, defs=0x0, defcount=0, kwdefs=0x0, closure=0x0, name=0x0, qualname=0x0) at Python/ceval.c:4329 #36 0x0000555555678dfa in _PyEval_EvalCodeWithName (qualname=0x0, name=0x0, closure=0x0, kwdefs=0x0, defcount=0, defs=0x0, kwstep=2, kwcount=0, kwargs=0x0, kwnames=0x0, argcount=0, args=0x0, locals=0x7ffff7644a80, globals=0x7ffff7644a80, _co=0x7ffff75e10e0) at Python/ceval.c:4361 #37 PyEval_EvalCodeEx (closure=0x0, kwdefs=0x0, defcount=0, defs=0x0, kwcount=0, kws=0x0, argcount=0, args=0x0, locals=0x7ffff7644a80, globals=0x7ffff7644a80, _co=0x7ffff75e10e0) at Python/ceval.c:4377 #38 PyEval_EvalCode (co=co@entry=0x7ffff75e10e0, globals=globals@entry=0x7ffff7644a80, locals=locals@entry=0x7ffff7644a80) at Python/ceval.c:828 #39 0x00005555556b908c in run_eval_code_obj (locals=0x7ffff7644a80, globals=0x7ffff7644a80, co=0x7ffff75e10e0, tstate=0x5555559247f0) at Python/pythonrun.c:1221 #40 run_mod (mod=, filename=filename@entry=0x7ffff76b9570, globals=globals@entry=0x7ffff7644a80, locals=locals@entry=0x7ffff7644a80, flags=flags@entry=0x7fffffffc7d8, arena=arena@entry=0x7ffff7693910) at Python/pythonrun.c:1242 #41 0x00005555556bb0d0 in pyrun_file (flags=0x7fffffffc7d8, closeit=, locals=0x7ffff7644a80, globals=0x7ffff7644a80, start=257, filename=0x7ffff76b9570, fp=0x555555925310) at Python/pythonrun.c:1140 #42 pyrun_simple_file (flags=0x7fffffffc7d8, closeit=, filename=0x7ffff76b9570, fp=0x555555925310) at Python/pythonrun.c:450 #43 PyRun_SimpleFileExFlags (fp=fp@entry=0x555555925310, filename=, closeit=closeit@entry=1, flags=flags@entry=0x7fffffffc7d8) at Python/pythonrun.c:483 #44 0x00005555556bb6cc in PyRun_AnyFileExFlags (fp=fp@entry=0x555555925310, filename=, closeit=closeit@entry=1, flags=flags@entry=0x7fffffffc7d8) at Python/pythonrun.c:92 #45 0x00005555555babef in pymain_run_file (cf=0x7fffffffc7d8, config=0x555555926020) at Modules/main.c:373 #46 pymain_run_python (exitcode=exitcode@entry=0x7fffffffc900) at Modules/main.c:598 #47 0x00005555555bb290 in Py_RunMain () at Modules/main.c:677 #48 pymain_main (args=0x7fffffffc8c0) at Modules/main.c:707 #49 Py_BytesMain (argc=, argv=) at Modules/main.c:731 #50 0x00007ffff7cb1d90 in __libc_start_call_main (main=main@entry=0x5555555afeb0
, argc=argc@entry=2, argv=argv@entry=0x7fffffffca48) at ../sysdeps/nptl/libc_start_call_main.h:58 #51 0x00007ffff7cb1e40 in __libc_start_main_impl (main=0x5555555afeb0
, argc=2, argv=0x7fffffffca48, init=, fini=, rtld_fini=, stack_end=0x7fffffffca38) at ../csu/libc-start.c:392 #52 0x00005555555b9e75 in _start () ```
merlimat commented 1 year ago

Gotcha, it looks a deadlock between threads 1 & 2.

There's a place where the Python GIL is not released before making the Pulsar C++ producer.flushAsync().

Getting a fix out.

jvstein commented 1 year ago

Cool. I was finally able to reproduce with the example code above and a docker based standalone cluster, but I needed to do it with a partitioned topic with at least 2 partitions.

merlimat commented 1 year ago

@jvstein thanks for the debug info. I've posted a fix and push a test tag to build wheel files with it.

Once they're done, you can try with one of the wheels from: https://github.com/merlimat/pulsar-client-python/actions/runs/5073247045