apache / pulsar-client-python

Apache Pulsar Python client library
https://pulsar.apache.org/
Apache License 2.0
51 stars 39 forks source link

Client is highly fork-unsafe: any usage of multiprocessing deadlocks if an at-fork hook triggers garbage collection of a Pulsar client #127

Open zbentley opened 1 year ago

zbentley commented 1 year ago

Context:

Bug description:

It's common practice to clear out global connection cached objects after a fork takes place. That's typically done via the os.register_at_fork method in Python.

Callbacks to that method are similar to signal handlers: they shouldn't do anything complicated or IO-ful. However something very common to do in an atfork callback is to wipe out a global connection cache, setting it to None in the child process to indicate that a new connection should be opened if needed.

We do that with our Pulsar client connections. Since the Pulsar client isn't fork safe at all, we wipe out our global references to it whenever we fork via register_at_fork handlers (and our code runs in tons of our internal services that make heavy use of Python forking; this is an extremely common practice in large Python installs: gunicorn, Celery, multiprocessing, billiard, airflow ... all heavily use fork-based concurrency to encapsulate workloads, share expensive-to-load-and-huge pre-forked memory, and evade the GIL).

The most common forking system is multiprocessing, which provides a "run some work concurrency then wrap up" interface.

We observed that arbitrary multiprocessing workloads across our fleet occasionally hang. Workloads that very frequently invoke multiprocessing pools after publishing to Pulsar were more likely to hang.

After some painful production incidents, we arrived at the below reproduction, which indicates an internal Pulsar client deadlock that occurs when a global connection cache is cleared out, triggering destruction inside the Pulsar-client C++ code.

Note that this occurs on any invocation of multiprocessing in the presence of a Pulsar connection that is cleared by an atfork hook, regardless of whether the multiprocessing code, or the parent process, will ever use Pulsar again. This is highly unfortunate.

Steps to Reproduce

  1. Run a standalone pulsar broker on localhost:6650.
  2. Put the below Python code in a repro.py file. Optionally, edit the test topic name to use a topic/namespace/tenant that exist for you locally.
  3. Do while true; do python repro.py; done to repeatedly invoke it.
  4. Wait ten minutes.

Snippet:

import time
from multiprocessing import Pool, set_start_method
import pulsar
import os

CLIENT = None
PROD = None

def do_send():
    global CLIENT
    global PROD
    if PROD is None:
        CLIENT = pulsar.Client(service_url='pulsar://localhost:6650')
        PROD = CLIENT.create_producer(topic='persistent://chariot1/chariot_ns_sre--kms_test/chariot_topic_kms_test')
    PROD.send(b"123")

def clear_connection_cache():
    global CLIENT
    global PROD
    print("Destroying connections", flush=True)
    CLIENT = PROD = None
    print("Destroyed connections", flush=True)

def main():
    os.register_at_fork(after_in_child=clear_connection_cache)
    do_send()
    processes = 20
    set_start_method('fork')
    pool = Pool(
        processes=processes,
    )

    try:
        pool.map_async(
            time.sleep, [0.1 for _ in range(processes)]
        ).get(1)
    finally:
        pool.close()
        print("Joining pool", flush=True)
        pool.join()
        print("Joined pool", flush=True)

if __name__ == '__main__':
    main()

Expected behavior

The code should continue successfully being invoked over and over after ten minutes.

More to the point all invocations of Pulsar client functionality, including destruction, should be "safe" to perform in all contexts: forks, threads, asyncio coroutines, trio coroutines, multiprocessing, Billiard, pthread_atfork hooks, signal handlers, etc. "Safe" might mean "works" or it might mean "errors immediately saying 'this is not allowed'". But hanging forever is not safe.

Observed behavior

After some number of iterations, usually <1min, the code hangs in a pool-join timeout.

When that happens, the number of Destroying connections log lines and Destroyed connections log lines will not match up, indicating that one call to CLIENT = PROD = None is blocking.

Root cause

When the test code hangs, attaching a py-spy stack printer (with --threads --native) to the hung subprocess indicates that it is stuck in what appears to be code related to the client's destructor. The code in _after_fork_child in the below stack is exactly analogous to the clear_connection_cache code in the above snippet:

image
BewareMyPower commented 1 year ago

Unfortunately this script cannot reproduce the deadlock in my local env. I started a Pulsar standalone 2.11.1 and installed the Python client 3.1.0 on Ubuntu 22.04 with Python 3.8. Then I just modified the topic name to my-topic and run while true; do python3 repro.py; done. The loop has been running for about 30 minutes and it still worked well.

2023-06-01T19:36:56,301+0800 [pulsar-io-19-11] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:46530] Created new producer: Producer{topic=PersistentTopic{topic=persistent://public/default/my-topic}, client=/127.0.0.1:46530, producerName=standalone-0-2414, producerId=0}

As you can see the suffix of the automatically generated producer name is 2414 so the script has been executed for 2000+ times.

From the stack you provided, it's stuck at ClientConnection::closeSocket when clearing the connections in the pool. Normally, it should just fail with ConnectError. Could you provide some latest logs when the deadlock happened in your environment so I can see the difference?

Though I think this issue should still exist for the latest code, could you also help verify the 3.2.0 candidate 1? https://dist.apache.org/repos/dist/release/pulsar/pulsar-client-python-3.1.0/

I believe it's because the C++ destructors have done many unnecessary things in the destructors. We should keep the destructors simple. Here is another issue related to the destructor: https://github.com/apache/pulsar-client-python/issues/103

The simplest solution might be adding an option to skip all shutdown calls for these C++ classes. But it would be better if I can reproduce it locally then I can verify if this fix works.

zbentley commented 1 year ago

backtrace.txt Thanks @BewareMyPower. Here are the logs from a run (MacOS 11, Python 3.10.9, client 3.1.0) that got stuck with 4 processes, connecting explicitly to the topic persistent://chariot1/chariot_ns_sre--kms_test/chariot_topic_kms_test-partition-1:

Joining pool
Joined pool
2023-06-01 09:40:21.098 INFO  [0x104424580] ProducerImpl:697 | Producer - [persistent://chariot1/chariot_ns_sre--kms_test/chariot_topic_kms_test-partition-1, standalone-36-796] , [batching  = off]
2023-06-01 09:40:21.098 INFO  [0x104424580] ClientConnection:1600 | [[::1]:53830 -> [::1]:6650] Connection closed with ConnectError
2023-06-01 09:40:21.099 INFO  [0x104424580] ClientConnection:269 | [[::1]:53830 -> [::1]:6650] Destroyed connection
2023-06-01 09:40:21.357 INFO  [0x104514580] ClientConnection:190 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2023-06-01 09:40:21.357 INFO  [0x104514580] ConnectionPool:97 | Created connection for pulsar://localhost:6650
2023-06-01 09:40:21.359 INFO  [0x16baf3000] ClientConnection:388 | [[::1]:53831 -> [::1]:6650] Connected to broker
2023-06-01 09:40:21.375 INFO  [0x16baf3000] HandlerBase:72 | [persistent://chariot1/chariot_ns_sre--kms_test/chariot_topic_kms_test-partition-1, ] Getting connection from pool
2023-06-01 09:40:21.386 INFO  [0x16baf3000] ProducerImpl:202 | [persistent://chariot1/chariot_ns_sre--kms_test/chariot_topic_kms_test-partition-1, ] Created producer on broker [[::1]:53831 -> [::1]:6650]
Destroying connections
2023-06-01 09:40:21.402 INFO  [0x104514580] ProducerImpl:697 | Producer - [persistent://chariot1/chariot_ns_sre--kms_test/chariot_topic_kms_test-partition-1, standalone-36-797] , [batching  = off]
2023-06-01 09:40:21.402 INFO  [0x104514580] ClientConnection:1600 | [[::1]:53831 -> [::1]:6650] Connection closed with ConnectError
Destroying connections
2023-06-01 09:40:21.403 INFO  [0x104514580] ProducerImpl:697 | Producer - [persistent://chariot1/chariot_ns_sre--kms_test/chariot_topic_kms_test-partition-1, standalone-36-797] , [batching  = off]
Destroying connections
2023-06-01 09:40:21.403 INFO  [0x104514580] ClientConnection:1600 | [[::1]:53831 -> [::1]:6650] Connection closed with ConnectError
2023-06-01 09:40:21.403 INFO  [0x104514580] ProducerImpl:697 | Producer - [persistent://chariot1/chariot_ns_sre--kms_test/chariot_topic_kms_test-partition-1, standalone-36-797] , [batching  = off]
2023-06-01 09:40:21.403 INFO  [0x104514580] ClientConnection:1600 | [[::1]:53831 -> [::1]:6650] Connection closed with ConnectError
Destroying connections
Destroyed connections
Destroyed connections
Destroyed connections
Joining pool

An lldb backtrace is attached to this comment. Due to MacOS differences, it looks slightly different than the py-spy backtrace I provided from a Linux host in production, but shows similar defective behavior.

The problem largely appears to be that fork-safe programs that use threads must assume those threads may vanish without informing the rest of the program (that's what pthread_atfork(3) is for). When a threaded program forks-without-execcing, only the thread calling fork(2) exists in the child. All of the other threads vanish, in the midst of whatever they were doing.

To be fair, this is documentedly unsafe behavior according to POSIX, but it's also the everyday reality for the most common Python application harnesses in the world. Most Python isn't single-threaded, nor is it single-process. As a result, drivers loaded by Python programs must assume those programs may fork, at which point threads will vanish.

While this is water far under the bridge at this point for pulsar-client, those realities are one of the reasons why multithreaded drivers are often a problematic design. Since drivers have to work in "hostile environments" (embedded interpreters, forking code, thread-constrained, under-resourced, driver code invoked from signal handlers or atfork hooks, etc). Using multithreading inside a client library might be safe in languages that tend to work in a more uniform "the runtime is the entry point" way, like Go and Java, but in languages like Python that are often run in weird ways and/or messed up environments, it can cause problems. The more robust drivers I've used eschew multithreading internally, even at the cost of more complex usage APIs for end users (e.g. user code must assume the responsibility for "turning" the driver event loop and/or performing heartbeat pings).

zbentley commented 1 year ago

To expand on the design issues: fork safety in pure Python is decently easy to achieve, thanks to the GIL. If a Python program is using multiple Python threading.Threads, and one of those threads calls os.fork(), that's reasonably safe for two reasons:

As soon as another programming language's threaded runtime (c++ threads and boost, in this instance) enters into the mix, though, all bets are off: that runtime likely doesn't have something analogous to the GIL, so the set of things that it can be "in the middle of" when the fork(2) call happens explodes to include "nearly everything", including a lot of stuff that manipulates shared resources such that, if a thread vanishes during fork, other threads won't work (e.g. a background thread held a vital mutex, which is what's happening here).

It's unfortunate historically that Python has the GIL, and as a result has so much forking code. But that's sadly where things are at today. For example, one of the pieces of code affected by the incident that surfaced this bug forks three levels deep, and spawns/reaps forks multiple hundreds of times a second (all of which don't exec, but rather do manipulations of CoW-shared memory). All of that is parallelized across hundreds of invocations, so the probability of hitting fork-unsafety-based race conditions approaches 100%.

There are lots of other "war stories" about this out there:

merlimat commented 1 year ago

I was able to reproduce on 3.1.0 client.

* thread #1, queue = 'com.apple.main-thread', stop reason = signal SIGSTOP
  * frame #0: 0x000000018577ebc8 libsystem_kernel.dylib`__psynch_mutexwait + 8
    frame #1: 0x00000001857b90c4 libsystem_pthread.dylib`_pthread_mutex_firstfit_lock_wait + 84
    frame #2: 0x00000001857b6a5c libsystem_pthread.dylib`_pthread_mutex_firstfit_lock_slow + 248
    frame #3: 0x00000001074225fc _pulsar.cpython-311-darwin.so`unsigned long boost::asio::detail::kqueue_reactor::cancel_timer<boost::asio::time_traits<boost::posix_time::ptime> >(boost::asio::detail::timer_queue<boost::asio::time_traits<boost::posix_time::ptime> >&, boost::asio::detail::timer_queue<boost::asio::time_traits<boost::posix_time::ptime> >::per_timer_data&, unsigned long) + 56
    frame #4: 0x0000000107569680 _pulsar.cpython-311-darwin.so`pulsar::ProducerImpl::shutdown() + 260
    frame #5: 0x000000010755daa4 _pulsar.cpython-311-darwin.so`pulsar::ProducerImpl::~ProducerImpl() + 96
    frame #6: 0x0000000107414878 _pulsar.cpython-311-darwin.so`pybind11::class_<pulsar::Producer>::dealloc(pybind11::detail::value_and_holder&) + 132
    frame #7: 0x00000001073b2c6c _pulsar.cpython-311-darwin.so`pybind11::detail::clear_instance(_object*) + 396
    frame #8: 0x00000001073b28dc _pulsar.cpython-311-darwin.so`pybind11_object_dealloc + 20

My impression is that this is not related to the Python wrapper, but rather to the locks state in Boost Asio after the fork. In the case above, it just got stuck while trying to cancel a timer. The lock on the timer was not held by anyone else.

Perhaps it's possible that the timer was being triggered when the fork happened and it left the mutex locked in the child process.

merlimat commented 1 year ago

The above stack trace is different from the first one, although similar in principle. There, we got stuck while closing a socket, for which again the mutex should have had no reason to be taken.

merlimat commented 1 year ago

I believe we would have to inform Asio about the happening fork (https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/reference/io_context/notify_fork.html), though I don't see an immediate easy way to hook it up automatically, in the C++ code.

zbentley commented 1 year ago

@merlimat pthread_atfork has a before-fork hook that could be used for this. However, this stacko appears to indicate that notify_fork may not be sufficient to resolve all issues. I'm not a Boost expert, though, so I'm not quite sure.

merlimat commented 1 year ago

I tried to expose the manual hook in C++ client and call it from Python wrapper. So far I was not successful in not making it crash when

C++ Diff

diff --git include/pulsar/Client.h include/pulsar/Client.h
index 3514934..d648612 100644
--- include/pulsar/Client.h
+++ include/pulsar/Client.h
@@ -46,6 +46,12 @@ class ClientImpl;
 class PulsarFriend;
 class PulsarWrapper;

+enum ForkEvent {
+    ForkPrepare,
+    ForkParent,
+    ForkChild
+};
+
 class PULSAR_PUBLIC Client {
    public:
     /**
@@ -414,6 +420,26 @@ class PULSAR_PUBLIC Client {
     void getSchemaInfoAsync(const std::string& topic, int64_t version,
                             std::function<void(Result, const SchemaInfo&)> callback);

+    /**
+     * Notify the Pulsar client instance about an OS fork() operation.
+     * This will help ensure the state of threads and mutexes is kept consistent
+     * after the fork is completed.
+     *
+     * Example of how to use this:
+     *
+     * <code>
+     * pulsarClient.notifyFork(pulsar::ForkPrepare);
+     * if (fork() == 0) {
+     *     // This is the child process.
+     *     pulsarClient.notifyFork(pulsar::ForkChild);
+     * } else {
+     *     // This is the parent process.
+     *     pulsarClient.NotifyFork(pulsar::ForkParent);
+     * }
+     * </code>
+     */
+    void notifyFork(ForkEvent forkEvent);
+
    private:
     Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
            bool poolConnections);
diff --git lib/Client.cc lib/Client.cc
index 48c4a67..fa92a43 100644
--- lib/Client.cc
+++ lib/Client.cc
@@ -192,6 +192,8 @@ void Client::closeAsync(CloseCallback callback) { impl_->closeAsync(callback); }

 void Client::shutdown() { impl_->shutdown(); }

+void Client::notifyFork(ForkEvent forkEvent) { impl_->notifyFork(forkEvent); }
+
 uint64_t Client::getNumberOfProducers() { return impl_->getNumberOfProducers(); }
 uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); }

diff --git lib/ClientImpl.cc lib/ClientImpl.cc
index a8fc24c..e912cff 100644
--- lib/ClientImpl.cc
+++ lib/ClientImpl.cc
@@ -772,4 +772,20 @@ std::string ClientImpl::getClientVersion(const ClientConfiguration& clientConfig
     return oss.str();
 }

+void ClientImpl::notifyFork(ForkEvent forkEvent) {
+    Lock lock(mutex_);
+
+//    if (ioExecutorProvider_) {
+//        ioExecutorProvider_->notifyFork(forkEvent);
+//    }
+
+    if (listenerExecutorProvider_) {
+        listenerExecutorProvider_->notifyFork(forkEvent);
+    }
+
+    if (partitionListenerExecutorProvider_) {
+        partitionListenerExecutorProvider_->notifyFork(forkEvent);
+    }
+}
+
 } /* namespace pulsar */
diff --git lib/ClientImpl.h lib/ClientImpl.h
index 9ee7095..ce12fd2 100644
--- lib/ClientImpl.h
+++ lib/ClientImpl.h
@@ -124,6 +124,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {

     std::shared_ptr<std::atomic<uint64_t>> getRequestIdGenerator() const { return requestIdGenerator_; }

+    void notifyFork(ForkEvent forkEvent);
+
     friend class PulsarFriend;

    private:
diff --git lib/ExecutorService.cc lib/ExecutorService.cc
index a0dff0b..fa2829e 100644
--- lib/ExecutorService.cc
+++ lib/ExecutorService.cc
@@ -20,6 +20,7 @@

 #include "LogUtils.h"
 #include "TimeUtils.h"
+#include "Latch.h"
 DECLARE_LOG_OBJECT()

 namespace pulsar {
@@ -157,4 +158,32 @@ void ExecutorServiceProvider::close(long timeoutMs) {
         executor.reset();
     }
 }
+
+
+static void notifyForkOnIoService(ExecutorServicePtr executor, boost::asio::io_service::fork_event event) {
+    Latch l(1);
+
+    executor->postWork([=, &l] {
+        executor->getIOService().notify_fork(event);
+        l.countdown();
+    });
+
+    l.wait();
+}
+
+void ExecutorServiceProvider::notifyFork(ForkEvent forkEvent) {
+    Lock lock(mutex_);
+
+    for (auto& executor : executors_) {
+        switch (forkEvent) {
+            case ForkPrepare:
+                notifyForkOnIoService(executor, boost::asio::io_service::fork_prepare);
+            case ForkParent:
+                notifyForkOnIoService(executor, boost::asio::io_service::fork_parent);
+            case ForkChild:
+                notifyForkOnIoService(executor, boost::asio::io_service::fork_child);
+        }
+    }
+}
+
 }  // namespace pulsar
diff --git lib/ExecutorService.h lib/ExecutorService.h
index 4717ccb..d14de4c 100644
--- lib/ExecutorService.h
+++ lib/ExecutorService.h
@@ -32,6 +32,7 @@
 #include <memory>
 #include <mutex>
 #include <thread>
+#include <pulsar/Client.h>

 namespace pulsar {
 typedef std::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
@@ -93,6 +94,8 @@ class PULSAR_PUBLIC ExecutorServiceProvider {
     // See TimeoutProcessor for the semantics of the parameter.
     void close(long timeoutMs = 3000);

+    void notifyFork(ForkEvent forkEvent);
+
    private:
     typedef std::vector<ExecutorServicePtr> ExecutorList;
     ExecutorList executors_;

Python Diff:

diff --git pulsar/__init__.py pulsar/__init__.py
index dbf3d82..4cb55ad 100644
--- pulsar/__init__.py
+++ pulsar/__init__.py
@@ -58,6 +58,7 @@ from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
 from pulsar import schema
 _schema = schema

+import os
 import re
 _retype = type(re.compile('x'))

@@ -491,6 +492,14 @@ class Client:
         conf.tls_validate_hostname(tls_validate_hostname)
         self._client = _pulsar.Client(service_url, conf)
         self._consumers = []
+        # Prepare to handle forks
+        os.register_at_fork(before=lambda: self._notifyFork(_pulsar.ForkEvent.Prepare),
+                            after_in_child=lambda: self._notifyFork(_pulsar.ForkEvent.Child),
+                            after_in_parent=lambda: self._notifyFork(_pulsar.ForkEvent.Parent))
+
+    def _notifyFork(self, event):
+        print("NOTIFY FORK: %s" % event)
+        self._client.notifyFork(event)

     @staticmethod
     def _prepare_logger(logger):
diff --git src/client.cc src/client.cc
index 626ff9f..2369842 100644
--- src/client.cc
+++ src/client.cc
@@ -79,5 +79,6 @@ void export_client(py::module_& m) {
         .def("get_topic_partitions", &Client_getTopicPartitions)
         .def("get_schema_info", &Client_getSchemaInfo)
         .def("close", &Client_close)
+        .def("notifyFork", &Client::notifyFork)
         .def("shutdown", &Client::shutdown);
 }
diff --git src/enums.cc src/enums.cc
index f61011f..3e63de7 100644
--- src/enums.cc
+++ src/enums.cc
@@ -124,4 +124,9 @@ void export_enums(py::module_& m) {
         .value("Info", Logger::LEVEL_INFO)
         .value("Warn", Logger::LEVEL_WARN)
         .value("Error", Logger::LEVEL_ERROR);
+
+    enum_<ForkEvent>(m, "ForkEvent")
+        .value("Prepare", ForkPrepare)
+        .value("Parent", ForkParent)
+        .value("Child", ForkChild);
 }

Still crashing:

Thread 0 Crashed::  Dispatch queue: com.apple.main-thread
0   _pulsar.cpython-311-darwin.so          0x10857682c void boost::asio::io_context::initiate_post::operator()<std::__1::function<void ()>&>(std::__1::function<void ()>&, boost::asio::io_context*) const + 184 (io_context.hpp:194)
1   _pulsar.cpython-311-darwin.so          0x108576768 void boost::asio::detail::completion_handler_async_result<std::__1::function<void ()>, void ()>::initiate<boost::asio::io_context::initiate_post, std::__1::function<void ()>&, boost::asio::io_context*>(boost::asio::io_context::initiate_post&&, std::__1::function<void ()>&, boost::asio::io_context*&&) + 44 (async_result.hpp:482)
2   _pulsar.cpython-311-darwin.so          0x108576730 boost::asio::constraint<detail::async_result_has_initiate_memfn<std::__1::function<void ()>&, void ()>::value, decltype(async_result<std::__1::decay<std::__1::function<void ()>&>::type, void ()>::initiate(declval<boost::asio::io_context::initiate_post&&>(), declval<std::__1::function<void ()>&>(), declval<boost::asio::io_context*&&>()))>::type boost::asio::async_initiate<std::__1::function<void ()>&, void (), boost::asio::io_context::initiate_post, boost::asio::io_context*>(boost::asio::io_context::initiate_post&&, std::__1::function<void ()>&, boost::asio::io_context*&&) + 40 (async_result.hpp:895)
3   _pulsar.cpython-311-darwin.so          0x108562098 decltype(async_initiate<std::__1::function<void ()>&, void ()>(decltype(__declval<std::__1::function<void ()>&>(0)) std::__1::declval<boost::asio::io_context::initiate_post>()(), fp, this)) boost::asio::io_context::post<std::__1::function<void ()>&>(std::__1::function<void ()>&) + 44 (io_context.hpp:206)
4   _pulsar.cpython-311-darwin.so          0x108562060 pulsar::ExecutorService::postWork(std::__1::function<void ()>) + 28 (ExecutorService.cc:130)
5   _pulsar.cpython-311-darwin.so          0x1085626dc pulsar::notifyForkOnIoService(std::__1::shared_ptr<pulsar::ExecutorService>, boost::asio::execution_context::fork_event) + 128 (ExecutorService.cc:166)
6   _pulsar.cpython-311-darwin.so          0x108562574 pulsar::ExecutorServiceProvider::notifyFork(pulsar::ForkEvent) + 200 (ExecutorService.cc:180)
7   _pulsar.cpython-311-darwin.so          0x108446e70 pulsar::ClientImpl::notifyFork(pulsar::ForkEvent) + 80 (ClientImpl.cc:783)
8   _pulsar.cpython-311-darwin.so          0x108386860 pulsar::Client::notifyFork(pulsar::ForkEvent) + 36 (Client.cc:195)
9   _pulsar.cpython-311-darwin.so          0x108249460 pybind11::cpp_function::cpp_function<void, pulsar::Client, pulsar::ForkEvent, pybind11::name, pybind11::is_method, pybind11::sibling>(void (pulsar::Client::*)(pulsar::ForkEvent), pybind11::name const&, pybind11::is_method const&, pybind11::sibling const&)::'lambda'(pulsar::Client*, pulsar::ForkEvent)::operator()(pulsar::Client*, pulsar::ForkEvent) const + 120 (pybind11.h:109)
10  _pulsar.cpython-311-darwin.so          0x1082493dc void pybind11::detail::argument_loader<pulsar::Client*, pulsar::ForkEvent>::call_impl<void, pybind11::cpp_function::cpp_function<void, pulsar::Client, pulsar::ForkEvent, pybind11::name, pybind11::is_method, pybind11::sibling>(void (pulsar::Client::*)(pulsar::ForkEvent), pybind11::name const&, pybind11::is_method const&, pybind11::sibling const&)::'lambda'(pulsar::Client*, pulsar::ForkEvent)&, 0ul, 1ul, pybind11::detail::void_type>(pulsar::Client&&, pybind11::detail::index_sequence<0ul, 1ul>, pybind11::detail::void_type&&) && + 88 (cast.h:1439)
11  _pulsar.cpython-311-darwin.so          0x108248f78 std::__1::enable_if<std::is_void<void>::value, pybind11::detail::void_type>::type pybind11::detail::argument_loader<pulsar::Client*, pulsar::ForkEvent>::call<void, pybind11::detail::void_type, pybind11::cpp_function::cpp_function<void, pulsar::Client, pulsar::ForkEvent, pybind11::name, pybind11::is_method, pybind11::sibling>(void (pulsar::Client::*)(pulsar::ForkEvent), pybind11::name const&, pybind11::is_method const&, pybind11::sibling const&)::'lambda'(pulsar::Client*, pulsar::ForkEvent)&>(pybind11::cpp_function::cpp_function<void, pulsar::Client, pulsar::ForkEvent, pybind11::name, pybind11::is_method, pybind11::sibling>(void (pulsar::Client::*)(pulsar::ForkEvent), pybind11::name const&, pybind11::is_method const&, pybind11::sibling const&)::'lambda'(pulsar::Client*, pulsar::ForkEvent)&) && + 36 (cast.h:1413)
12  _pulsar.cpython-311-darwin.so          0x108248eac void pybind11::cpp_function::initialize<pybind11::cpp_function::cpp_function<void, pulsar::Client, pulsar::ForkEvent, pybind11::name, pybind11::is_method, pybind11::sibling>(void (pulsar::Client::*)(pulsar::ForkEvent), pybind11::name const&, pybind11::is_method const&, pybind11::sibling const&)::'lambda'(pulsar::Client*, pulsar::ForkEvent), void, pulsar::Client*, pulsar::ForkEvent, pybind11::name, pybind11::is_method, pybind11::sibling>(void&&, pulsar::Client (*)(pulsar::ForkEvent), pybind11::name const&, pybind11::is_method const&, pybind11::sibling const&)::'lambda'(pybind11::detail::function_call&)::operator()(pybind11::detail::function_call&) const + 132 (pybind11.h:249)
13  _pulsar.cpython-311-darwin.so          0x108248e0c void pybind11::cpp_function::initialize<pybind11::cpp_function::cpp_function<void, pulsar::Client, pulsar::ForkEvent, pybind11::name, pybind11::is_method, pybind11::sibling>(void (pulsar::Client::*)(pulsar::ForkEvent), pybind11::name const&, pybind11::is_method const&, pybind11::sibling const&)::'lambda'(pulsar::Client*, pulsar::ForkEvent), void, pulsar::Client*, pulsar::ForkEvent, pybind11::name, pybind11::is_method, pybind11::sibling>(void&&, pulsar::Client (*)(pulsar::ForkEvent), pybind11::name const&, pybind11::is_method const&, pybind11::sibling const&)::'lambda'(pybind11::detail::function_call&)::__invoke(pybind11::detail::function_call&) + 28 (pybind11.h:224)
14  _pulsar.cpython-311-darwin.so          0x1081e0494 pybind11::cpp_function::dispatcher(_object*, _object*, _object*) + 3960 (pybind11.h:929)
zbentley commented 1 year ago

A workaround that seemed effective at the application layer was to intentionally leak Client objects into a global in the child processes (an atfork child handler that appends them to a global array that's never emptied, and should be GC'd at interpreter shutdown but not before then).

Could something similar be done inside the client? Perhaps by tracking constructing PID and short-circuiting/no-opping destructor calls if the destructing PID and constructing PID mis-match?

BewareMyPower commented 1 year ago

Yes. We cannot delete or close a Client object anyway.

Perhaps by tracking constructing PID and short-circuiting/no-opping destructor calls if the destructing PID and constructing PID mis-match?

Yeah, we can record the PID when constructing a Client object and only call shutdown in the destructor when the PID is the same with the recorded PID.