ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.05k stars 395 forks source link

Confused about the behavior of *_event_loop and *_new_thread #455

Open sparkkk opened 6 years ago

sparkkk commented 6 years ago

Hi,

Here is my program:

for (int i = 0; i < 2; ++i) {
    auto sched = serialize_new_thread();
    //auto sched = serialize_event_loop();
    rxcpp::observable<>::from(1, 2, 3)
        .observe_on(sched)
        .map([](int v) {
            cout<<"map: tid="<<this_thread::get_id()<<" v="<<v<<endl;
            return v+3;
        })
        .observe_on(sched)
        .as_blocking()
        .subscribe([](int v) {
            cout<<"next: tid="<<this_thread::get_id()<<" v="<<v<<endl;
        });
}

My aim is to make the two output executed in two different threads, while the same output in different loop should be executed in the same thread.

When I choose sched = serialize_new_thread(), I got the following output which satisfied my aim:

map: tid=0x7000086b1000 v=1 map: tid=0x7000086b1000 v=2 map: tid=0x7000086b1000 v=3 next: tid=0x70000862e000 v=4 next: tid=0x70000862e000 v=5 next: tid=0x70000862e000 v=6 map: tid=0x7000086b1000 v=1 map: tid=0x7000086b1000 v=2 map: tid=0x7000086b1000 v=3 next: tid=0x70000862e000 v=4 next: tid=0x70000862e000 v=5 next: tid=0x70000862e000 v=6

And when I choose serialize_event_loop, the threads in the second loop are different :

map: tid=0x70000ecdf000 v=1 next: tid=0x70000ec5c000 v=4 map: tid=0x70000ecdf000 v=2 next: tid=0x70000ec5c000 v=5 map: tid=0x70000ecdf000 v=3 next: tid=0x70000ec5c000 v=6 map: tid=0x70000ebd9000 v=1 map: tid=0x70000ebd9000 v=2 map: tid=0x70000ebd9000 v=3 next: tid=0x70000ed62000 v=4 next: tid=0x70000ed62000 v=5 next: tid=0x70000ed62000 v=6

These results seem weird to me because it is just opposite to what I expected: "new_thread" should start a new thread each time executed while "event_loop" should execute in the same thread only if I passed the same handle to it.

Would you please help me to understand these things correctly?

kirkshoop commented 6 years ago

Hi!

I would expect that the new_thread is is exhibiting some thread reuse from the os or thread libraries.

The event_loop creates more threads and round-robins workers on the threads. event loop is very unlikely to reuse the same thread.

To be explicit about the thread the pattern is to create a new_thread or event-loop worker for each context desired outside the loop and then create a same_worker over that so that all copies use the same worker for all the work items. one of these workers wrapped in same_worker would be passed to each of the observe_on calls.

sparkkk commented 6 years ago

Hi Kirkshoop,

Thank you very much! I have make it by the following code:

auto sched = make_event_loop();
auto worker1 = sched.create_worker();
auto worker2 = sched.create_worker();
for (int i = 0; i < 2; ++i) {
    observable<>::from(1, 2, 3)
        .observe_on(identity_same_worker(worker1))
        .map([](int v) {
            cout<<"map: tid="<<this_thread::get_id()<<" v="<<v<<endl;
            return v+3;
        })
        .observe_on(identity_same_worker(worker2))
        .as_blocking()
        .subscribe([](int v) {
            cout<<"next: tid="<<this_thread::get_id()<<" v="<<v<<endl;
        });
}

But excuse me that I had another question. What does the coordinator type mean and what is the difference between them ? Following are the coordinator types I mean here:

identity_one_worker
observe_on_one_worker
serialize_one_worker
synchronize_in_one_worker

Thanks!

kirkshoop commented 6 years ago

Hi,

I wrote a description a while ago here http://stackoverflow.com/a/30294037/599231

which is largely duplicated in the repo https://github.com/ReactiveX/RxCpp/blob/master/DeveloperManual.md

the short answer is that coordinations provide tools to coordinate multiple observables and tasks on the same scheduler worker. this is used in the operators so that they depend on the coordinations for all synchronization.

sparkkk commented 6 years ago

Hi,

I have read the manual, and here is my understanding: identity* : no protection between threads serialize: protected by what ever synchronize_: protected by mutex observeon*: make the worker pick one by one so there will be no coordination issue. Am I right?

Thanks.

david-hoze commented 4 years ago

Hi, need an answer for this too..