ericniebler / range-v3

Range library for C++14/17/20, basis for C++20's std::ranges
Other
4.1k stars 441 forks source link

multithreading #921

Open AlexanderZvyagin opened 5 years ago

AlexanderZvyagin commented 5 years ago

I started to play with range+multithreading combination, but did not make a big progress so far. Is there an example on how they can be combined with a nice functional style syntax?

To be more specific, I want to execute in parallel some threads, but the code below runs them sequentially. A hint or a reference to "the best practice" is greatly appreciated.

Thanks! Alexander.

#include <chrono>
#include <range/v3/all.hpp>
#include <thread>
#include <cstdio>

using namespace std::chrono_literals;

int main(void) {

    unsigned N = 2; // number of threads to execute

    auto worker = [](int i) {
        fprintf(stderr,"thread %d: start to simulate the hard work\n",i);
        std::this_thread::sleep_for(100ms);
        fprintf(stderr,"thread %d: end\n",i);
        return true;
    };

    auto sample =
          ranges::view::iota(0)
        | ranges::view::transform([worker](int n){
            fprintf(stderr,"executing thread %d\n",n);
            return std::move(std::thread(worker,n));
        })
        | ranges::view::transform([](std::thread &&t){
            fprintf(stderr,"waiting ...\n");
            t.join();
            fprintf(stderr,"got something!\n");
            return false;
        })
        | ranges::view::take(N);

    ranges::for_each(sample, [](int n) {
        fprintf(stderr,"result is %d\n",n);
    });

    fprintf(stderr,"end of main()\n");

    return 0;
}

The program execution time is 200ms (instead of 100ms on a 2 core machine) and the output is:

executing thread 0
waiting ...
thread 0: starts to simulate the hard work
thread 0: end
got something!
result is 0
executing thread 1
waiting ...
thread 1: starts to simulate the hard work
thread 1: end
got something!
result is 0
end of main()
kirkshoop commented 5 years ago

It depends on the goal.

process data that is already in memory in parallel.

There is some exploration of adding support for this to range-v3 in this repo:

https://github.com/codeplaysoftware/parallelts-range-exploration

    gstorm::sycl_exec exec;

    auto ga = std::experimental::copy(exec, va);
    auto gb = std::experimental::copy(exec, vb);

    auto multiplied = ranges::view::transform(ga, gb, std::multiplies<int>{});
    auto result = std::experimental::reduce(exec, multiplied, 0, std::plus<int>{});

    auto expected = ranges::accumulate(ranges::view::transform(va, vb, std::multiplies<int>{}), 0, std::plus<int>{});

process data as it arrives, concurrently

I wrote an application that uses rxcpp to collect data from web requests and then use range-v3 to process each set as it arrives.

https://github.com/kirkshoop/twitter/blob/master/main.cpp#L754

    auto sentimentupdates = ts |
        onlytweets() |
        buffer_with_time(milliseconds(500), tweetthread) |
        filter([](const vector<Tweet>& tws){ return !tws.empty(); }) |
        rxo::map([=](const vector<Tweet>& tws) -> observable<Reducer> {
            vector<string> text = tws | 
                ranges::view::transform([](Tweet tw){
                    auto& tweet = tw.data->tweet;
                    return tweettext(tweet);
                });
            return sentimentrequest(poolthread, factory, . . . , text) |
                rxo::map([=](const string& body){
                    auto response = json::parse(body);
                    auto combined = ranges::view::zip(response["Results"]["output1"], tws);
// . . .