ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.07k stars 396 forks source link

Async background task with result on main thread #217

Closed martinfinke closed 8 years ago

martinfinke commented 8 years ago

I would like to run an expensive task in the background (on the rxcpp event loop) and have an observable that will output the results on the main thread. I've read a previous issue about this (#151) and tried to do something similar.

My problem is that the task runs in the background only when it runs for the first time. After that, it always runs on the main thread.

Here's a simplified version of the code I have. First I have a Dispatcher singleton that exposes my framework's run loop to rxcpp:

class Dispatcher : private Timer {
public:
    static Dispatcher& get() {
        static Dispatcher instance;
        return instance;
    }
   schedulers::run_loop& getRunLoop() {
        return rl;
    }
private:
    schedulers::run_loop rl;
    Dispatcher() {
        startTimerHz(30);
    }
    void timerCallback() override {
        // This gets called periodically by the framework (on the main thread)
        while(!rl.empty() && rl.peek().when < rl.now()) {
            rl.dispatch();
        }
    }
};

Then I create an observable similar to this:

observable<String> foo(observable<String> a, observable<String> b) {
    return a.combine_latest(b)
        .subscribe_on(rxcpp::synchronize_event_loop())
        .map(/* some expensive blocking function that should run in the background */)
        .observe_on(rxcpp::observe_on_run_loop(Dispatcher::get().getRunLoop()));
}

Also, in the original issue (#151), a composite_subscription called lifetime is being used. What is this for?

kirkshoop commented 8 years ago

This answer is relevant http://stackoverflow.com/a/36523132

The subscribe_on operator will call subscribe for a and b on the event loop thread, but the map will run on the thread that a or b are calling on_next from. To guarantee the map runs on the event loop remove the subscribe_on and add another observe_on before the map.

martinfinke commented 8 years ago

Great! That works perfectly. I've just written something with debounce and a child process that would be a real headache without rxcpp. Thanks to you and everyone contributing to this project!