ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.06k stars 396 forks source link

run_loop dispatch optimization #601

Open famik opened 1 year ago

famik commented 1 year ago

I found a problem with rxqt: if use timeout operator, the subscriber completed but will not free memory until timeout To solve this problem,I modify on_event_scheduled:

void on_event_scheduled() {
    while (!rxcpp_run_loop.empty() &&
            (rxcpp_run_loop.now() > rxcpp_run_loop.peek().when ||
            !rxcpp_run_loop.peek().what.is_subscribed())) { // important!!
        rxcpp_run_loop.dispatch();
    }

    if (!rxcpp_run_loop.empty()) {
        const auto time_till_next_event = ms_until(rxcpp_run_loop.peek().when);
        timer->start(static_cast<int>(time_till_next_event.count()));
    }
}

A better approach is to modify the dispatch function, which can reduce lock function calls

void dispatch() const {
    std::unique_lock<std::mutex> guard(state->lock);
    while (!state->q.empty()) {
        auto& peek = state->q.top();
        if (!peek.what.is_subscribed()) {
            state->q.pop();
            continue;
        }
        if (clock_type::now() < peek.when) {
            break;
        }
        auto what = peek.what;
        state->q.pop();
        state->r.reset(state->q.empty());
        guard.unlock();
        what(state->r.get_recurse());
        guard.lock();
    }
}

And then on_event_scheduled can be simplified:

void on_event_scheduled() {
    rxcpp_run_loop.dispatch();

    if (!rxcpp_run_loop.empty()) {
        const auto time_till_next_event = ms_until(rxcpp_run_loop.peek().when);
        timer->start(static_cast<int>(time_till_next_event.count()));
    }
}