facebookexperimental / libunifex

Unified Executors
Other
1.48k stars 188 forks source link

Question: when_all taking a container of tasks #251

Closed matthijs closed 2 years ago

matthijs commented 3 years ago

Hi,

I am trying to schedule a lot of tasks on the threadpool. An example of what I am trying to achieve is probably easier to understand:

#include <iostream>

#include <unifex/static_thread_pool.hpp>
#include <unifex/transform.hpp>
#include <unifex/await_transform.hpp>
#include <unifex/sync_wait.hpp>
#include <unifex/when_all.hpp>
#include <unifex/task.hpp>

//template <typename Scheduler, typename F>
//auto run_on(Scheduler&& s, F&& func) {
//  return unifex::transform(unifex::schedule((Scheduler&&)s), (F&&)func);
//}

int main(int argc, char** argv)
{
    unifex::static_thread_pool tp;
    auto sched = tp.get_scheduler();
    std::atomic<uint32_t> x = 0;

    auto makeTask = [&](unifex::static_thread_pool::scheduler scheduler, uint32_t a) -> unifex::task<void> {
        co_await unifex::schedule(scheduler);
        std::printf("task %i\n", a);
        std::this_thread::sleep_for(std::chrono::milliseconds{10});
        x++;

        co_return;
    };

    // Generate 1000 tasks
    std::vector<unifex::task<void>> tasks;
    tasks.reserve(1000);
    for(uint32_t i = 0; i < 1000; i++)
    {
        tasks.push_back(makeTask(sched, i));
    }

    // Schedule on the pool
    unifex::sync_wait(unifex::when_all(makeTask(sched, 1), makeTask(sched, 2)));
    //unifex::sync_wait(unifex::when_all(std::move(tasks)));

    auto start = std::chrono::system_clock::now();
    //for(auto&& t : tasks) {
    //  unifex::sync_wait(std::move(t));
    //}

    std::cout << "time elapsed: " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - start).count() << "ms" << std::endl;
    std::cout << "result: " << x << std::endl;
    return 0;
}

When using the for loop with 'sync_wait' it is still sequential (obvious). How can I achieve this in such a way that the coroutines are scheduled and executed on the threadpool parallel?

Regards, Matthijs

ericniebler commented 3 years ago

This is a missing feature. There currently isn't a satisfactory solution. You could when_all N at a time in a loop. That's the best you can do today.

matthijs commented 3 years ago

Ah thanks!

If anyone is coming across this limitation, I implemented this very simple snippet to handle this:

template<typename Rng, std::size_t... I>
constexpr auto when_all_vec_impl(const Rng& rng, std::index_sequence<I...>)
{
    return unifex::when_all(std::move(rng[I])...);
}
template<typename Rng, std::size_t N, typename Indices = std::make_index_sequence<N>>
constexpr auto when_all_vec(const Rng& rng)
{
    return when_all_vec_impl(rng, Indices{});
}

// Create tasks...
std::vector<unifex::task<void>> tasks;
//tasks.push_back(...);

// Call when_all_vec
constexpr std::size_t num = 100;
unifex::syncwait(when_all_vec<decltype(tasks), num>(tasks)); // assuming tasks has at least 100 items

Of course you need to handle the remaining tasks in the tasks container.

ccotter commented 2 years ago

There currently isn't a satisfactory solution.

libunifex newbie here. I'm curious - is there a fundamental problem preventing a satisfactory solution? Would some sender/receiver analogy of cppcoro's when_all do the trick (i.e., when_all on a container requires memory allocation)?

Separate question .. in general, I can't find any wording for when_all accepting a vector of senders (or range, or begin and end iterators) in P2300. Would P2300 one day have such language, or would that come in another paper?

ericniebler commented 2 years ago

There's nothing technical preventing a range-based sender/receiver when_all algorithm. Nobody has written one, that's all.

What I neglected to mention in my original answer is that the best way to do this in libunifex today is to spawn the work in an async_scope and then wait on the scope for all the work to finish.

EDIT: And to answer your other question, P2300 will probably not get more algorithms, but there are certain to be follow-on papers that add more algorithms, this one included.

EDIT 2: I know @lums658 wants this algorithm also and is trying his hand at implementing it, but I don't know if it's for libunifex or the P2300 reference implementation.

ccotter commented 2 years ago

Thank you - I'll check out async_scope as well.

As a learning exercise for myself, I've got a half baked implementation working that's effectively a "replace tuple with vector" version of the variadic implementation, if anyone wants to compares notes.

ccotter commented 2 years ago

async_scope seems to do just the trick, acting as a nice provider of heap allocation as needed for creating a when_all for a runtime container of tasks. I'll leave https://github.com/ccotter/libunifex/blob/when_all/examples/when_all_scope.cpp#L61 as a partially complete attempt at a generic solution built on top of async_scope with examples at the bottom of the file.

janondrusek commented 2 years ago

454 introduced a when_all_range algorithm. It takes either std::vector<Sender> or a pair of Iterator.