ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

Flat map serializes output when using observable<>::create #575

Open JavierBejMen opened 2 years ago

JavierBejMen commented 2 years ago

Hi all! We encountered this issue when using flat_map operator, as defined in documentation flat_map internally merges emisions of the CollectionSelector function. When using range in said function the output is as expected:

TEST(RxcppThreading, FlatMap){
    auto values = rxcpp::observable<>::range(1, 3)
        .flat_map(
            [](int v){
                return observable<>::range(1, 4);
            },
            [](int v_main, int v_sub){
                return std::make_tuple(v_main, v_sub);
            });

    values.subscribe(
            [](std::tuple<int, int> v){printf("OnNext: %d - %ld\n", std::get<0>(v), std::get<1>(v));},
            [](){printf("OnCompleted\n");});
}

Output:

OnNext: 1 - 1
OnNext: 1 - 2
OnNext: 2 - 1
OnNext: 1 - 3
OnNext: 2 - 2
OnNext: 3 - 1
OnNext: 1 - 4
OnNext: 2 - 3
OnNext: 3 - 2
OnNext: 2 - 4
OnNext: 3 - 3
OnNext: 3 - 4

But when using the create function all outputs are seralized for each source observable:

TEST(RxcppThreading, FlatMapCreate){
        auto values = rxcpp::observable<>::range(1, 3)
        .flat_map(
            [](int v){
                return observable<>::create<int>([](subscriber<int> s){
                    for (auto i = 0; i < 4; ++i){
                        if (!s.is_subscribed())
                            break;
                        s.on_next(i);
                    }
                    s.on_completed();
                });
            },
            [](int v_main, int v_sub){
                return std::make_tuple(v_main, v_sub);
            });

    values.subscribe(
            [](std::tuple<int, long> v){printf("OnNext: %d - %ld\n", std::get<0>(v), std::get<1>(v));},
            [](){printf("OnCompleted\n");});
}

Output:

OnNext: 1 - 0
OnNext: 1 - 1
OnNext: 1 - 2
OnNext: 1 - 3
OnNext: 2 - 0
OnNext: 2 - 1
OnNext: 2 - 2
OnNext: 2 - 3
OnNext: 3 - 0
OnNext: 3 - 1
OnNext: 3 - 2
OnNext: 3 - 3

Is this the expected behavior? Or there is somenthing we are missing? We need to perform the flat_map operation using create with the emissions not serialized. If someone could give us a hand to workaround this, we will be very grateful.

Thanks in advance! :smiley:

victimsnino commented 2 years ago

Looks like it is because range observable uses scheduling under-hood via identity_current_thread instead of identity_immediate

    template<class T>
    static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
        -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
        return      rxs::range<T>(first, last, step, identity_current_thread());
    }

I've expected, that second output should be "canonical", due to flat_map just subscribes on observable. In your cases both of observables are synchronized (i mean, no direct usage of threads and etc), so, subscribe should wait till on_completed. By this reason second result is correct. To have "non-serialized" output for second scenario you need to provide some scheduling manually.

@kirkshoop , what is reason to use current_thread instead of immediate for range observable?

kirkshoop commented 2 years ago

current_thread is used so that multiple ranges interleave values even on a single thread. Not interleaving values can create infinite allocations inside some operators (zip is one culprit).

The create usage here is blocking in the subscribe function until the for loop completes. To get interleaving a loop cannot be used in create. Instead there must be some state and a function scheduled that sends one value and updates the state and reschedules itself.