ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.05k stars 395 forks source link

concat multi-thread issue, applying subscribe_on(...) to the inner observable #503

Open zued opened 5 years ago

zued commented 5 years ago

By running the test code below...

auto input_observable = rxcpp::observable<>::range(0, 9999);

auto output_observable = rxcpp::observable<>::defer(
    [input_observable]()
    {
        rxcpp::composite_subscription cs;
        auto coordination = rxcpp::serialize_same_worker(rxcpp::observe_on_new_thread().create_coordinator(cs).get_worker());

        // TODO: Define my per-subscription data here...

        return input_observable.map(
            [coordination](auto&& input)
            {
                // TODO: Handle my per-subscription data here...

                return rxcpp::observable<>::defer(
                    [input]()
                    {
                        // TODO: Do my heavy work on the worker thread, refering to my per-subscription data here...

                        return rxcpp::observable<>::just(input + 1);
                    }).subscribe_on(coordination);
            }).concat().finally([cs]() { cs.unsubscribe(); });
    });

int count = 0;
output_observable.as_blocking().subscribe(
    [&count](auto&& output)
    {
        if (output % 100 == 0) {
            std::cout << output << std::endl;
        }
        ++count;
    },
    [](auto&& exception)
    {
        std::cout << "on_error" << std::endl;
    },
    []()
    {
        std::cout << "on_completed" << std::endl;
    });

std::cout << "count: " << count << std::endl;

...it is expected that "count: 10000" is printed at the end. However it occasionally terminates unexpectedly or prints "count: 9995" etc. It always prints "count: 10000" when ".subscribe_on(coordination)" is removed.

To see rx-concat.hpp, it seems not supporting that "sink" handler and "sinkInner" handler work on different threads. I just tried a quick fix and now it seems working.

rx-concat.hpp.patch.txt

Can concat operator be modified to support the multi-thread situation, or is there any way to avoid the problem with the current version of concat operator?

zued commented 5 years ago

Additional info: Changing .subscribe_on(coordination); to .subscribe_on(rxcpp::observe_on_new_thread()); or .subscribe_on(rxcpp::serialize_new_thread()); does not work. And also changing the following line (observe_on_new_thread -> serialize_new_thread) does not work.

    auto coordination = rxcpp::serialize_same_worker(rxcpp::serialize_new_thread().create_coordinator(cs).get_worker());
kirkshoop commented 5 years ago

yes, by default concat uses the current thread coordination which is not thread-safe. when concat is used with sources on different threads it must be given a thread-safe coordination to use instead of the default current thread coordination. provide the coordination to concat.

zued commented 5 years ago

Thank you. It greatly worked by changing concat call like this:

}).concat(rxcpp::serialize_new_thread()).finally([cs]() { cs.unsubscribe(); });
zued commented 4 years ago

Let me reopen this issue. I have encountered a similar problem that the inner rxcpp::observable<>::defer was never called from a certain point. Of course a thread-safe coordination is passed to concat, like concat(rxcpp::serialize_new_thread()).

The problem was somehow fixed by modifying rx-concat.hpp. I would like to confirm if I am still misusing something or rx-concat.hpp is to be fixed. (Unfortunately I have not succeeded to reproduce it by a simple test program)

Below picked up the source code from rx-concat.hpp.

    void subscribe_to(collection_type st)
    {
        ...

        collectionLifetime.add(make_subscription([state, innercstoken](){
            // (*1)
            state->out.remove(innercstoken);
        }));

        ...

        auto sinkInner = make_subscriber<value_type>(
            ...
        //on_completed
            [state](){
                // (*2)
                if (!state->selectedCollections.empty()) {
                    auto value = state->selectedCollections.front();
                    state->selectedCollections.pop_front();
                    state->collectionLifetime.unsubscribe();
                    state->subscribe_to(value);
                } else if (!state->sourceLifetime.is_subscribed()) {
                    state->out.on_completed();
                }
            }
        );

        ...
    }

...

auto sink = make_subscriber<collection_type>(
            ...
// on_next
    [state](collection_type st) {
        // (*3)
        if (state->collectionLifetime.is_subscribed()) {
            state->selectedCollections.push_back(st);
        } else if (state->selectedCollections.empty()) {
            state->subscribe_to(st);
        }
    },
    ...
);

In the normal case, it goes from an inner observable to the next one like:

  1. [inner] on_completed of sinkInner (*2)
  2. [inner] (state->sourceLifetime is unsubscribed and?) state->out.remove(innercstoken); (1) (I do not trace completely, but I guess "state->sourceLifetime is unsubscribed" and "state->out.remove(innercstoken); (1)" are done in a kind of an atomic operation)
  3. [outer] on_next of sink (*3) ...

But it seems that (1) and (2) are not atomic. That is, occasionally (3) is done before (1).

As a result, it stops working properly like:

  1. [inner] on_completed of sinkInner (*2)
  2. [outer] on_next of sink (*3) --> state->selectedCollections.push_back(st); because state->sourceLifetime is still subscribed.
  3. [inner] (state->sourceLifetime is unsubscribed and?) state->out.remove(innercstoken); (*1)
  4. [outer] on_next of sink --> Does nothing because state->sourceLifetime is not subscribed and state->selectedCollections is not empty.
  5. [outer] on_next of sink --> Does nothing ... ...

The problem seems to be fixed by doing state->collectionLifetime.unsubscribe() explicitly in on_completed of sinkInner like:

        auto sinkInner = make_subscriber<value_type>(
            ...
        //on_completed
            [state](){
                // (*2)
                if (!state->selectedCollections.empty()) {
                    auto value = state->selectedCollections.front();
                    state->selectedCollections.pop_front();
                    state->collectionLifetime.unsubscribe();
                    state->subscribe_to(value);
                } else if (!state->sourceLifetime.is_subscribed()) {
                    state->out.on_completed();
                } else {
                    // ***** Unsubscribe collectionLifetime explicitly *****
                    state->collectionLifetime.unsubscribe();
                }
            }
        );

Is this a right way to fix concat?

zued commented 4 years ago

FYI: Tried another way fixing the outer on_next handler and it also seems working.

auto sink = make_subscriber<collection_type>(
    ...
// on_next
    [state](collection_type st) {
        // (*3)
        if (state->collectionLifetime.is_subscribed()) {
            state->selectedCollections.push_back(st);
        } else if (state->selectedCollections.empty()) {
            state->subscribe_to(st);
        } else {
            // collectionLifetime is not subscribed but the queue is not empty.
            // Push the item into the queue, and restart subscription from the first item in the queue.
            state->selectedCollections.push_back(st);
            auto value = state->selectedCollections.front();
            state->selectedCollections.pop_front();
            state->subscribe_to(value);
        }
    },
    ...
);