ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

order of subcription affect with_lastest_from #519

Closed ElonH closed 4 years ago

ElonH commented 4 years ago

Hi,

There are two examples.

    rxcpp::subjects::subject<int> a, b, c, d;
    auto ao = a.get_observable();
    auto bo = b.get_observable();
    auto co = c.get_observable();
    auto doo = d.get_observable();

    auto w = co.with_latest_from(ao, bo).subscribe([&](const std::tuple<int, int, int> &e) {
        logger::debug("w");
        d.get_subscriber().on_next(4);
    });
    auto x = doo.subscribe([](const int &e) {
        logger::debug("x");
    });
    auto xb = doo.with_latest_from(bo).subscribe([](const std::tuple<int, int> &e) {
        logger::debug("xb");
    });
    auto xc = doo.with_latest_from(co).subscribe([](const std::tuple<int, int> &e) {
        logger::debug("xc");
    });
    auto xbc = doo.with_latest_from(bo, co).subscribe([](const std::tuple<int, int, int> &e) {
        logger::debug("xbc");
    });
    auto y = co.subscribe([](const int &e) {
        logger::debug("y");
    });

    a.get_subscriber().on_next(1);
    b.get_subscriber().on_next(2);
    c.get_subscriber().on_next(3);
/* console output
[15:18:55.902] [880,13932] [debug] w
[15:18:55.917] [880,13932] [debug] x
[15:18:55.918] [880,13932] [debug] xb
[15:18:55.918] [880,13932] [debug] y
*/

I think that it should be output all messages, but xc and xbc are missed.

And I try put w behind xbc, like that:

    auto x = doo.subscribe([](const int &e) {
        logger::debug("x");
    });
    auto xb = doo.with_latest_from(bo).subscribe([](const std::tuple<int, int> &e) {
        logger::debug("xb");
    });
    auto xc = doo.with_latest_from(co).subscribe([](const std::tuple<int, int> &e) {
        logger::debug("xc");
    });
    auto xbc = doo.with_latest_from(bo, co).subscribe([](const std::tuple<int, int, int> &e) {
        logger::debug("xbc");
    });
    auto w = co.with_latest_from(ao, bo).subscribe([&](const std::tuple<int, int, int> &e) {
        logger::debug("w");
        d.get_subscriber().on_next(4);
    });
    auto y = co.subscribe([](const int &e) {
        logger::debug("y");
    });

/* console output
[15:34:20.090] [6464,1064] [debug] w
[15:34:20.096] [6464,1064] [debug] x
[15:34:20.109] [6464,1064] [debug] xb
[15:34:20.121] [6464,1064] [debug] xc
[15:34:20.122] [6464,1064] [debug] xbc
[15:34:20.123] [6464,1064] [debug] y
*/

In my mind, the above example are equivalent, mean that w,x,xb,xc,xbc,y will be outputed in both examples. I'm a newee in using RxCpp library, am I missed? I should take care of subcription order, when using with_lastest_from ?

kirkshoop commented 4 years ago

Hi!

This is the expected behaviour, even though it is surprising at first glance.

You are correct that subscription order matters here. The subscriptions to a subject are kept in a queue and values are delivered to the subscriptions in order.

In this case the w subscription is the source of the doo signal. Thus, in the first example, the w subscription receives the co signal before the xc and xbc subscriptions and emits the doo signal before co has reached xc or xbc. with_latest_from gets the doo signal and has no value from co to emit, so nothing is emitted.