xhawk18 / promise-cpp

C++ promise/A+ library in Javascript style.
MIT License
672 stars 92 forks source link

Assertion error in multi-threaded programs #18

Open hotwatermorning opened 2 years ago

hotwatermorning commented 2 years ago

Hi, I'm trying to use promise-cpp in multi-threaded programs, but sometimes the following assertion error occurs in promise-cpp.

promise_inl.hpp(161)

            std::list<std::shared_ptr<Task>> &pendingTasks = promiseHolder->pendingTasks_;
            //promiseHolder->dump();
            assert(pendingTasks.front() == task); // Assertion failed: (pendingTasks.front() == task), function call, file promise_inl.hpp, line 161.
            pendingTasks.pop_front();
            //promiseHolder->dump();

The README document says promise-cpp is thread safe but It seems not thread safe in some situation.

Is this a bug? Or should I fix how I use promise-cpp? Thanks.

Step to reproduce:

Build and run this code snippet.

#include <iostream>
#include <memory>
#include <vector>
#include <thread>
#include <functional>
#include <mutex>
#include <atomic>
#include <deque>
#include <stdexcept>

#define PROMISE_HEADONLY
#include <promise-cpp/promise.hpp>

struct ThreadPool
{
    ThreadPool() {}
    ~ThreadPool() {}

    void startThreads(int num) {
        stopThreads();
        _shouldStop = false;
        for(int i = 0; i < num; ++i) {
            auto th = std::thread([this] { threadProcess(); });
            _threads.push_back(std::move(th));
        }
    }

    void stopThreads() {
        _shouldStop = true;
        std::for_each(_threads.begin(), _threads.end(),
                      [](auto &th) { th.join(); });
    }

    void addTask(std::function<void()> f) {
        auto lock = std::unique_lock<std::mutex>(_mtx);
        _tasks.push_back(f);
        _cvEmpty.notify_all();
    }

    void threadProcess()
    {
        for( ; ; ) {
            if(_shouldStop) { break; }

            std::function<void()> f;
            {
                auto lock = std::unique_lock<std::mutex>(_mtx);

                auto result = _cvEmpty.wait_for(lock, std::chrono::milliseconds(100), [&] {
                    return _tasks.size() > 0;
                });

                if(result == false) { continue; }

                f = _tasks.front();
                _tasks.pop_front();
            }

            f();
        }
    }

private:
    std::mutex _mtx;
    std::deque<std::function<void()>> _tasks;
    std::vector<std::thread> _threads;
    std::atomic<bool> _shouldStop = { false };
    std::condition_variable _cvEmpty;

public:
    static ThreadPool * getInstance() { return _instance.load(); }
    static ThreadPool * setInstance(ThreadPool *inst) { return _instance.exchange(inst); }

private:
    static inline std::atomic<ThreadPool *> _instance;
};

promise::Promise yield_async()
{
    return promise::newPromise([](promise::Defer defer) {
        ThreadPool::getInstance()->addTask([defer] {
            defer.resolve();
        });
    });
}

int main()
{
    std::cout << "Start." << std::endl;
    ThreadPool pool;
    ThreadPool::setInstance(&pool);

    pool.startThreads(10);

    auto finished = std::make_shared<bool>(false);

    static auto func = [=] {
        return yield_async().then([] {
            return true;
        });
    };

    {
        auto count = std::make_shared<std::atomic<int>>(0);

        promise::doWhile([=](promise::DeferLoop loop) {
            if(*count >= 300) {
                loop.doBreak(count->load());
                return;
            }

            promise::newPromise([=](promise::Defer defer) {
                func().then([=](bool b) {
                    bool x = b;
                }).then([=] {
                    return yield_async();
                }).then([=] {
                    defer.resolve();
                });

            }).then([=] {
                *count += 1;
                std::cout << "count: " << *count << std::endl;
                yield_async().then([=] {
                    loop.doContinue();
                });
            });
        }).then([=](int n) {
            std::cout << "hello : " << n << std::endl;
            *finished = true;
        });
    };

    pool.stopThreads();
    ThreadPool::setInstance(nullptr);

    std::cout << "Finished." << std::endl;
    return 0;
}
xhawk18 commented 2 years ago

thank you, It seems like a bug, i'll check it soon!

xhawk18 commented 2 years ago

hi, i send a patch here that may fix this issue.

btw, the testing code above may have dead-lock and we need set larger thread pool size other than 10. the situration may be -- thread A is waiting for the idle state of thread pool. thread B is waiting for thread A call resolve.

hotwatermorning commented 2 years ago

Hi, Thank you for checking out this issue.

The patch adds a condition variable cond_ and waits for cond_ to be notified, but no one sends any notification to cond_. It seems that's why the deadlock occurs.

Thus the code which sends notification to cond_ should be added to promise_inl.hpp, I suppose.

SmileGobo commented 6 months ago

hi, i send a patch here that may fix this issue.

btw, the testing code above may have dead-lock and we need set larger thread pool size other than 10. the situration may be -- thread A is waiting for the idle state of thread pool. thread B is waiting for thread A call resolve.

Hi, this deadlock can be reproduced , as follows

#include <gtest/gtest.h>
#include <thread>
#include <promise/promise.hpp>
#include <condition_variable>

struct Event {
    std::condition_variable_any cv {};
    std::mutex mtx {};
    bool isset{false};
    std::unique_lock<std::mutex> lock() {
        return std::unique_lock{mtx};
    }
    void wait() {
        auto l = lock();
        while (!isset) cv.wait(l);
    }

    void set() {
        auto l = lock();
        isset = true;
        cv.notify_all();
    }
};

TEST(promise, parallel_lock) {
    auto pms = promise::newPromise();

    Event race;
    pms
        .then([&](int val){
            auto rslt = promise::resolve(std::string{"main resolve"});
            race.set();
            return rslt;
        })
        .fail([](const std::string& str){
            return std::string{str};
        });

    std::thread t {[&](){
        race.wait();
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        pms.then([](const std::string& str){
            return std::string{str};
        });
    }};
    pms.resolve(1);
    if (t.joinable()) t.join();
    EXPECT_TRUE(true);
}

also reqired path for better reproducibility https://github.com/xhawk18/promise-cpp/blob/556dce7183dc76a229e5f6f82bd5e7ef1203bbfd/include/promise-cpp/promise_inl.hpp#L291 std::this_thread::sleep_for(std::chrono::milliseconds(100)); test on #556dce718

There is a proposal to extend the kProccess states enum. According to which tasks will be added to the chain, but will not be executed if processing has already begun in another thread.