ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.05k stars 395 forks source link

run_loop scheduler thread safety #468

Open gyenesandras opened 6 years ago

gyenesandras commented 6 years ago

Hi,

First of all, thanks for this great library, it is fun to use.

I have some questions about run_loop scheduler. I have an app that handles multiple clients simultaneously. Clients can subscribe to multiple streams (observables) and I want to serve every client on its dedicated thread (one thread per client). I need to control the creation and destruction of the threads so I decided to use run_loop scheduler. All client threads have a run_loop. Subscriptions look like the following:

rxcpp::schedulers::run_loop* rl = get_run_loop();
auto observe_on_rl = rxcpp::observe_on_run_loop(*rl);
auto obs = producer.get_observable(key);
obs.observe_on(observe_on_rl).subscribe([](event_data data) {
  // send data to the client
});

The main loop of the client thread function looks like this:

rxcpp::schedulers::run_loop run_loop;
// ...
while (run)
{
  while (!run_loop.empty() && run_loop.peek().when < run_loop.now())
  {
    run_loop.dispatch();
  }
  // waiting for events
}

I have the following questions: 1) is run_loop the right choice for the scenario I described above? 2) I looked into the implementation and I saw that run_loop::dispatch() is guarded by a mutex but run_loop::empty() and run_loop::peek() simply forward the call to the queue object which is a priority queue. What happens if a producer emits a new value from another thread (so the queue is modified) while the client thread calls empty() or peek()? Shouldn't those functions be guarded like dispatch() is?

kirkshoop commented 6 years ago

Hi! I would not use run_loop for this case. I would create a new_thread worker for each client and wrap it with identity_same_worker or serialize_one_worker depending on whether there would be any cross-thread calls ("Clients can subscribe to multiple streams (observables)", if these streams are from a different threads I would use serialize_one_worker).

gyenesandras commented 6 years ago

Hi Kirk,

Thanks for your reply! I tried to implement what you suggested. First I failed because serialize_one_worker takes a scheduler. If I pass a new_thread scheduler then it creates a new thread for each subscription which is not what I'd like to do. So I tried again and came up with something like this:

    rxcpp::schedulers::new_thread thd1{client_thread_factory};
    rxcpp::composite_subscription cs1;
    auto w1 = thd1.create_worker(cs1);
    auto cli1 = rxcpp::serialize_same_worker(w1);

    p1.get_observable().observe_on(cli1).subscribe([](int x) {
        safe_print("client #1, p1 value: %d", x);
    });
    p2.get_observable().observe_on(cli1).subscribe([](int x) {
        safe_print("client #1, p2 value: %d", x);
    });

p1 and p2 are producers, implemented using behavior subjects. This way all 'client ... value' messages appear on the same thread. Is that what you meant above? If not then could you give me a hint how you would implement this?

kirkshoop commented 6 years ago

this is close. If you make a new worker for each producer I think you will get what you want.

rxcpp::schedulers::new_thread thd1{client_thread_factory};

rxcpp::composite_subscription cs1;
auto w1 = thd1.create_worker(cs1);
auto cli1 = rxcpp::serialize_same_worker(w1);

p1.get_observable().observe_on(cli1).subscribe([](int x) {
    safe_print("client #1, p1 value: %d", x);
});

rxcpp::composite_subscription cs2;
auto w2 = thd1.create_worker(cs2);
auto cli2 = rxcpp::serialize_same_worker(w2);
p2.get_observable().observe_on(cli2).subscribe([](int x) {
    safe_print("client #1, p2 value: %d", x);
});