Naios / continuable

C++14 asynchronous allocation aware futures (supporting then, exception handling, coroutines and connections)
https://naios.github.io/continuable/
MIT License
815 stars 44 forks source link

Continuable and executors doesn't seem to be working. #72

Closed pfeatherstone closed 3 months ago

pfeatherstone commented 3 months ago

@Naios

I am trying to build a basic task system using a custom thread pool and the continuable library. The code I'm using is as follows:

#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <deque>
#include <vector>
#include <functional>
#include <continuable/continuable.hpp>

namespace v1
{
    template<class T>
    class concurrent_queue
    {
    public:
        void close()
        {
            {
                std::unique_lock<std::mutex> l{lock};
                open = false;
            }
            pushed.notify_all();
        }

        template<class U>
        void push(U&& u)
        {
            {
                std::unique_lock<std::mutex> l{lock};
                queue.emplace_back(std::forward<U>(u));
            }
            pushed.notify_one();
        }

        template<class U>
        bool try_push(U&& u)
        {
            {
                std::unique_lock<std::mutex> l{lock, std::try_to_lock};
                if (!l)
                    return false;
                queue.emplace_back(std::forward<U>(u));
            }
            pushed.notify_one();
            return true;
        }

        bool pop(T& x)
        {
            std::unique_lock<std::mutex> l{lock};
            while (queue.empty() && open)
                pushed.wait(l);
            if (queue.empty())
                return false;
            x = std::move(queue.front());
            queue.pop_front();
            return true;
        }

        bool try_pop(T& x)
        {
            std::unique_lock<std::mutex> l{lock, std::try_to_lock};
            if (!l || queue.empty())
                return false;
            x = std::move(queue.front());
            queue.pop_front();
            return true;
        }

    private:
        std::deque<T>           queue;
        std::mutex              lock;
        std::condition_variable pushed;
        bool                    open{true};
    };

    class task_system
    {
    public:
        task_system(std::size_t nthreads = std::thread::hardware_concurrency())
        : queues(nthreads)
        {
            for (size_t i = 0 ; i < nthreads ; ++i)
                threads.emplace_back([this, i] {run(i);});
        }

        ~task_system()
        {
            for (auto& q : queues)  q.close();
            for (auto& t : threads) t.join();
        }

        template<class F>
        void async_detach(F&& f)
        {
            const auto i = index++;

            for(size_t n = 0 ; n < queues.size() * K ; ++n)
                if (queues[(i+n) % queues.size()].try_push(std::forward<F>(f)))
                    return;

            queues[i % queues.size()].push(std::forward<F>(f));
        }

    private:

        void run(const size_t i) 
        {
            while(true)
            {
                task t;

                for (size_t n = 0 ; n < queues.size() ; ++n)
                    if (queues[(i+n) % queues.size()].try_pop(t))
                        break;

                if (!t && !queues[i].pop(t))
                    break;

                t();
            }
        }

        using task = std::function<void()>;
        constexpr static std::size_t K{2}; // Tuneable spin parameter
        std::vector<std::thread>                threads;
        std::vector<concurrent_queue<task>>     queues;
        std::atomic<size_t>                     index{0};
    };
}

int main()
{
    v1::task_system tp(2);

    auto executor = [&](auto&& work) {
        tp.async_detach(std::forward<decltype(work)>(work));
    };

    auto ret = cti::make_ready_continuable()
    .then([] {
        printf("Hi from 1\n");
    }, executor)
    .then([] {
        printf("Hi from 2\n");
    }, executor)
    .then([] {
        printf("Hi from 3\n");
    }, executor)
    ;

    return 0;
}

This doesn't compile and I can't figure out why.


Commit Hash

master

Expected Behavior

I expect this to work as the documentation on custom executors is pretty simple and what i've written seems to conform.

Actual Behavior

Compilation error.

Steps to Reproduce

Run the code above.

Your Environment

pfeatherstone commented 3 months ago

Sorry, fixed it! Two fixes: Replace:

using task = std::function<void()>;

with

using task = std::move_only_function<void()>;

or

using task = fu2::unique_function<void()>;

then replace

auto executor = [&](auto&& work) {
        tp.async_detach(std::forward<decltype(work)>(work));
    };

with

auto executor = [&](auto&& work) {
        tp.async_detach([work_ = std::forward<decltype(work)>(work)] mutable { std::move(work_)(); });
    };