pybind / pybind11

Seamless operability between C++11 and Python
https://pybind11.readthedocs.io/
Other
15.73k stars 2.11k forks source link

[BUG]: Fatal Python error: PyThreadState_Get with c++ thread pool #4226

Open jagiella opened 2 years ago

jagiella commented 2 years ago

Required prerequisites

Problem description

I need to use a c++ implemented thread pool to run python functions/callsbacks. The thread pool has a python binding to accept functions (py::function) and the respective arguments (py::args and optionally py::kwargs).

For this purpose I used a thread pool example proposed in https://www.cnblogs.com/sinkinben/p/16064857.html and I just adapted its enqueue() function in order to accept py::functions, wrap them in a lambda expression and queue them in a task queue. The pool has a fix number of worker threads which will pick up and tasks from the queue.

The ref. counter of the arguments (py::args) are increased on enqueuing and decreased by the lambda function after execution of the py::function to ensure they are not garbage collected while still in the queue. The test example (see below) crashes after the first task finished and seems to be related to the args.dec_ref() call. If I comment that call out, it will run through, but leaves the ref counter increased...blocking garbage collection of the args. If I comment out the args.inc_ref() call during enqueue as well, it will crash during task execution.

Any idea what the problem is here? Any help is highly appreciated :)

Reproducible example code

Pool implementation (c++/pybind11)

#include <pybind11/pybind11.h>
#include <list>
#include <queue>
#include <future>
#include <chrono>
#include <ostream>
#include <iostream>
#include <condition_variable>

using namespace std::chrono_literals;
namespace py = pybind11;

class Pool {
private:
    int num_threads;
    bool stop;
    std::mutex mtx;
    std::condition_variable cv;
    std::list<std::future<void> > threads;
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
public:
    Pool(int numThreads = 2) :
            num_threads(numThreads), stop(false) {

        for (size_t i = 0; i < num_threads; ++i)
            {
                std::thread worker([this, i]() {
                    while (true)
                    {
                        std::cout << "[ thread "<<i<<" ]: wait for next task" << std::endl;
                        std::function<void()> task;
                        /* pop a task from queue, and execute it. */
                        {
                            std::unique_lock lock(mtx);
                            cv.wait(lock, [this]() { return stop || !tasks.empty(); });
                            if (stop && tasks.empty())
                                return;
                            /* even if stop = 1, once tasks is not empty, then
                             * excucte the task until tasks queue become empty
                             */
                            std::cout << "[ thread "<<i<<" ]: pick-up task" << std::endl;
                            task = std::move(tasks.front());
                            tasks.pop();
                        }
                        std::cout << "[ thread "<<i<<" ]: run task" << std::endl;
                        task();
                        std::cout << "[ thread "<<i<<" ]: task finished" << std::endl;
                    }
                });
                workers.emplace_back(std::move(worker));
            }
    }

    ~Pool() {
        /* stop thread pool, and notify all threads to finish the remained tasks. */
        {
            std::unique_lock<std::mutex> lock(mtx);
            stop = true;
        }
        cv.notify_all();
        for (auto &worker : workers)
            worker.join();
    }

    void enqueue(py::function f, py::args args){
        args.inc_ref();
        tasks.emplace([f, args]() -> void {
            std::cerr << "[ lambda ]: enter" << std::endl;
            py::gil_scoped_acquire acquire;
            f(*args);
            args.dec_ref();
            std::cerr << "[ lambda ]: exit" << std::endl;
        });
        cv.notify_one();
    }
};

PYBIND11_MODULE(threadpool, m) {
    py::class_<Pool>(m, "Pool")
            .def(py::init<>())
            .def("enqueue", &Pool::enqueue);
}

python test script

# test.py
import time
import sys
from threadpool import Pool

def slot( i):
     time.sleep(0.01)
     print('hello %s' % (i))

if(__name__=="__main__"):
    a = "world"
    p = Pool()
    time.sleep(0.1)
    print('#refs: slot=%d, arg=%d' % (sys.getrefcount(slot), sys.getrefcount(a)))

    print('enqueue')
    p.enqueue(slot, a)
    print('#refs: slot=%d, arg=%d' % (sys.getrefcount(slot), sys.getrefcount(a)))

    time.sleep(0.1)
    print('#refs: slot=%d, arg=%d' % (sys.getrefcount(slot), sys.getrefcount(a)))

    print('exit')

Error output

$ PYTHONPATH=. python3 ../test2.py 
[ thread 0 ]: wait for next task
[ thread 1 ]: wait for next task
#refs: slot=2, arg=4
enqueue
#refs: slot=3, arg=5
[ thread 0 ]: pick-up task
[ thread 0 ]: run task
[ lambda ]: enter
hello world
[ lambda ]: exit
[ thread 0 ]: task finished
Fatal Python error: PyThreadState_Get: the function must be called with the GIL held, but the GIL is released (the current Python thread state is NULL)
Python runtime state: initialized

Thread 0x00007ff20d85a040 (most recent call first):
  File "/home/USER/eclipse-workspace/threadpool/build/../test2.py", line 29 in <module>
Abgebrochen (Speicherabzug geschrieben)

Error output (commenting out args.dec_ref();)

[ thread 0 ]: wait for next task
[ thread 1 ]: wait for next task
#refs: slot=2, arg=4
enqueue
#refs: slot=3, arg=5
[ thread 0 ]: pick-up task
[ thread 0 ]: run task
[ lambda ]: enter
hello world
[ lambda ]: exit
[ thread 0 ]: task finished
[ thread 0 ]: wait for next task
#refs: slot=2, arg=5
exit

Error output (commenting out args.dec_ref(); and args.inc_ref(); )

[ thread 0 ]: wait for next task
[ thread 1 ]: wait for next task
#refs: slot=2, arg=4
enqueue
#refs: slot=3, arg=5
[ thread 0 ]: pick-up task
[ thread 0 ]: run task
[ lambda ]: enter
hello world
[ lambda ]: exit
[ thread 0 ]: task finished
Fatal Python error: PyThreadState_Get: the function must be called with the GIL held, but the GIL is released (the current Python thread state is NULL)
Python runtime state: initialized

Thread 0x00007f9c6062c040 (most recent call first):
  File "/home/nickj/eclipse-workspace/threadpool/build/../test2.py", line 29 in <module>
Abgebrochen (Speicherabzug geschrieben)
jagiella commented 1 year ago

I assume, a possible fix did finally not make it yet into the recently released 2.10.1 ?

rwgk commented 1 year ago

Sorry for the "FWD pybind11" noise, that was unintentional and completely unrelated to this PR (except for the noise). For completeness: I ran a script to create 29999 pywrapcc "FWD pybind11" dummy issues, so that the issue/PR numbers in the fork don't overlap with pybind11 numbering, which would be really confusing.

Usually I don't look at issues (impossible to keep up), but I try to look at all PRs.

But now that I'm looking: could this be a GIL-not-held issue? Your situation (C++ threads, callbacks) looks like a classical setup for GIL-related issues.

We have this on master: https://github.com/pybind/pybind11/pull/4246

It's also in v2.10.3. (Also in v2.10.2, but that was yanked, exactly because of that GIL check).

Could you please try your reproducer with v2.10.3, or master (or even with just the very small PR #4246 patched in)? Does that GIL check fire?

jagiella commented 1 year ago

I checked with the most recent version 2.10.3:

$ apt list pybind*
Auflistung… Fertig
pybind11-dev/lunar,lunar,now 2.10.3-1 all  [installiert]

I still get the following error, after the py::function call is finished in the thread:

$ PYTHONPATH=. python3 ../test_pool.py 
[ thread 0 ]: wait for next task
[ thread 1 ]: wait for next task
#refs: slot=2, arg=4
enqueue
#refs: slot=3, arg=5
[ thread 0 ]: pick-up task
[ thread 0 ]: run task
[ lambda ]: enter
hello world
[ lambda ]: exit
[ thread 0 ]: task finished
Fatal Python error: PyThreadState_Get: the function must be called with the GIL held, but the GIL is released (the current Python thread state is NULL)
Python runtime state: initialized

Thread 0x00007f013b48e040 (most recent call first):
  File "test_pool.py", line 29 in <module>
Abgebrochen (Speicherabzug geschrieben)

The python call runs and finishes correcly in its respective thread with the GIL held during its execution by the embedding lambda function. But it seems that after the c++ thread finished (and thus the scoped GIL acquisition is not valide anymore) somehow the GIL is required again and throws the error.

jagiella commented 1 year ago

Reproducible code:

# CMakeLists.txt
cmake_minimum_required(VERSION 3.24)

project(ThreadPool)

find_package(pybind11 REQUIRED)
pybind11_add_module(threadpool 
    pybind.cpp
)
# test_pool.py
import time
import sys
from threadpool import Pool

def slot( i):
     time.sleep(0.01)
     print('hello %s' % (i))

if(__name__=="__main__"):
    a = "world"
    p = Pool()
    time.sleep(0.1)
    print('#refs: slot=%d, arg=%d' % (sys.getrefcount(slot), sys.getrefcount(a)))

    print('enqueue')
    p.enqueue(slot, a)
    print('#refs: slot=%d, arg=%d' % (sys.getrefcount(slot), sys.getrefcount(a)))

    time.sleep(0.1)
    print('#refs: slot=%d, arg=%d' % (sys.getrefcount(slot), sys.getrefcount(a)))

    print('exit')
// pybind.cpp
#include <pybind11/pybind11.h>
#include <list>
#include <queue>
#include <future>
#include <chrono>
#include <ostream>
#include <iostream>
#include <condition_variable>

using namespace std::chrono_literals;
namespace py = pybind11;

#define PYBIND11_ASSERT_GIL_HELD_INCREF_DECREF

class Pool {
private:
    int num_threads;
    bool stop;
    std::mutex mtx;
    std::condition_variable cv;
    std::list<std::future<void> > threads;
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
public:
    Pool(int numThreads = 2) :
            num_threads(numThreads), stop(false) {

        for (size_t i = 0; i < num_threads; ++i)
            {
                std::thread worker([this, i]() {
                    while (true)
                    {
                        std::cout << "[ thread "<<i<<" ]: wait for next task" << std::endl;
                        std::function<void()> task;
                        /* pop a task from queue, and execute it. */
                        {
                            std::unique_lock lock(mtx);
                            cv.wait(lock, [this]() { return stop || !tasks.empty(); });
                            if (stop && tasks.empty())
                                return;
                            /* even if stop = 1, once tasks is not empty, then
                             * excucte the task until tasks queue become empty
                             */
                            std::cout << "[ thread "<<i<<" ]: pick-up task" << std::endl;
                            task = std::move(tasks.front());
                            tasks.pop();
                        }
                        std::cout << "[ thread "<<i<<" ]: run task" << std::endl;
                        task();
                        std::cout << "[ thread "<<i<<" ]: task finished" << std::endl;
                    }
                });
                workers.emplace_back(std::move(worker));
            }
    }

    ~Pool() {
        std::cout << "enter ~Pool()" << std::endl;
        /* stop thread pool, and notify all threads to finish the remained tasks. */
        {
            std::unique_lock<std::mutex> lock(mtx);
            stop = true;
        }
        cv.notify_all();
        for (auto &worker : workers)
            worker.join();
        std::cout << "leave ~Pool()" << std::endl;
    }

    void enqueue(std::function<void()> f){
        tasks.emplace(f);
        cv.notify_one();
    }
    void enqueue_py(py::function f, py::args args){
        //args.inc_ref();
        tasks.emplace([f, args]() -> void {
            std::cerr << "[ lambda ]: enter" << std::endl;
            py::gil_scoped_acquire acquire;
            f(*args);
            //args.dec_ref();
            std::cerr << "[ lambda ]: exit" << std::endl;
        });
        cv.notify_one();
    }
};

PYBIND11_MODULE(threadpool, m) {
    py::class_<Pool>(m, "Pool")
            .def(py::init<>())
            .def("enqueue", &Pool::enqueue_py);
}
rwgk commented 1 year ago

Fatal Python error: PyThreadState_Get: the function must be called with the GIL held, but the GIL is released (the current Python thread state is NULL)

That's very similar to what I was expecting: see "could this be a GIL-not-held issue?" in my previous comment.

It may or may not make give you more information: you need to define PYBIND11_ASSERT_GIL_HELD_INCREF_DECREF to activate the GIL check. Did you do that already?

Usual suspects for GIL issues are destructors: try adding print statements in ~Pool() to see if the Fatal Python error: PyThreadState_Get message appears while the code in the destructor is executed.

If this doesn't help, please create a PR with the reproducer code.

jagiella commented 1 year ago

I added a #define PYBIND11_ASSERT_GIL_HELD_INCREF_DECREF right after header includes and namespace definitions and it does not have any impact on the error or outputs.

Also adding outputs to the Pool destructor indicates, that the destructor is never called before the GIL error happens.

It seems like the error happens on destruction of the lambda function that captures the python function (py::function) and argument (py::args) to make it beeing stored in the task queue and beeing picked up+executed by the worker threads as argument-less and return-value-less std::function:

tasks.emplace([f, args]() -> void {
            std::cerr << "[ lambda ]: enter" << std::endl;
            py::gil_scoped_acquire acquire;
            f(*args);
            //args.dec_ref();
            std::cerr << "[ lambda ]: exit" << std::endl;
        });
jagiella commented 1 year ago

Well I experimented with the lambda and found that changing the signature from [f, args]() -> void {} to [=]() -> void {} eliminates the GIL error. So the working version of the enqueue function looks like this:

    void enqueue_py(py::function f, py::args args){
        //args.inc_ref(); <- adding leads to GIL error
        tasks.emplace([=]() -> void {
            std::cerr << "[ lambda ]: enter" << std::endl;
            py::gil_scoped_acquire acquire;
            f(*args);
            args.dec_ref(); // <- removing leads to GIL error
            std::cerr << "[ lambda ]: exit" << std::endl;
        });
        cv.notify_one();
    }

Curiously, I still need to decrease the ref counter of args at the end of the lambda call and must not (!) increase it before enqueuing to not get GIL related runtime errors...

[ thread 0 ]: wait for next task
[ thread 1 ]: wait for next task
#refs: slot=2, arg=4
enqueue
#refs: slot=3, arg=5
[ thread 0 ]: pick-up task
[ thread 0 ]: run task
[ lambda ]: enter
hello world
[ lambda ]: exit
[ thread 0 ]: task finished
[ thread 0 ]: wait for next task
#refs: slot=2, arg=4
exit
enter ~Pool()
leave ~Pool()
rwgk commented 1 year ago

I had this before you posted your last comment:

I think you're right: f and args are copied when the lambda function is created (inc_refs) and the copies destroyed when the lambda function goes out of scope (dec_refs).

I'm not sure I can be helpful here (I tend to try to stay away from threads as far as I possibly can).

I'm uncertain why PYBIND11_ASSERT_GIL_HELD_INCREF_DECREF does not make a difference: if f and args are dec_refed without the GIL, I'd expect the guards to fire, unless some other error pre-empts that.

From a quick look at the code in pytypes.h: I believe neither py::function nor py::args have any built-in GIL handling. That means you need to find another way of passing them in.

Wild guess:

Other idea:

jagiella commented 1 year ago

I think you're right: f and args are copied when the lambda function is created (inc_refs) and the copies destroyed when the lambda function goes out of scope (dec_refs).

Seems to be like this. Though curiously, even with the working version I still need to manually dec_ref the args at the end of the lambda execution to avoid getting the GIL error (see last comment). But I must not inc_ref the args before queue as this will also lead to a GIL error. Thats a very confusiong behaviour :)

Wild guess:

  • Could it be sufficient to call task.clear(); in ~Pool() with the GIL held?

If the application closes with still pending jobs in the queue, that definitely has to be done this way. Nevertheless, the queue should only contain pending tasks. As soon as they are picked up they should be removed from the queue and thus also destroyed after they finished running.

Other idea:

  • What if you pass them in by reference ([&f, &args])? That will only work if you are sure that f and args outlive tasks. Maybe that's something easy to ensure?

I already tried that. That will lead to seg faults.

void enqueue_py(py::function &f, py::args &args){
        args.inc_ref();
        tasks.emplace([&f, &args]() -> void {
            std::cerr << "[ lambda ]: enter" << std::endl;
            py::gil_scoped_acquire acquire;
            f(*args);
            args.dec_ref();
            std::cerr << "[ lambda ]: exit" << std::endl;
        });
        cv.notify_one();
    }
[ thread 0 ]: wait for next task
[ thread 1 ]: wait for next task
#refs: slot=2, arg=4
enqueue
#refs: slot=2, arg=5
[ thread 0 ]: pick-up task
[ thread 0 ]: run task
[ lambda ]: enter
Speicherzugriffsfehler (Speicherabzug geschrieben) (segmentation fault)
rwgk commented 1 year ago

I already tried that. That will lead to seg faults.

If you wish to keep the rest of the code mostly free from GIL handling:

You could stash f and args also in another member array of Pool, to keep the Python objects alive. Then clear that array in ~Pool with the GIL held, after first clearing tasks.

I'd also = delete the Pool copy constructor and copy assignment operator, as a simple way to ensure those don't run without the GIL.

jagiella commented 1 year ago

What I still don't understand is, why I...

void enqueue_py(py::function f, py::args args){
    {
        py::gil_scoped_acquire acquire;
        //f.inc_ref();      <- optional
        //args.inc_ref();   <- adding leads to GIL error
    }
    tasks.emplace([=]() -> void {
        std::cerr << "[ lambda ]: enter" << std::endl;
        py::gil_scoped_acquire acquire;
        f(*args);
        //f.dec_ref();     <- optional
        args.dec_ref(); // <- removing leads to GIL error
        std::cerr << "[ lambda ]: exit" << std::endl;
    });
    cv.notify_one();
}

Conclusions:

rwgk commented 1 year ago

Sorry I don't have the free bandwidth to look at this more. Just sharing my impression:

args.dec_ref(); // <- removing leads to GIL error