ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

observable.replay() seems to forget items on underlying stream's completion #539

Open quetzalcoatl opened 4 years ago

quetzalcoatl commented 4 years ago
    RXB::subject<bool> _flag;
    auto flag = _flag.get_observable().replay();

    _flag.get_subscriber().on_next(true);
    _flag.get_subscriber().on_completed();

    auto zz = flag.as_blocking().count();  // DEADLOCK

Forgive me this, but at first, I simply didn't notice that replay is connectable.
It's obvious only once you know it :)

I noticed it's connectable when I found .replay().ref_count() on some forum.
I tried that, and deadlock was gone (so at least the completion arrived), but also the items seem gone as well.

    RXB::subject<bool> _flag;
    auto flag = _flag.get_observable().replay().ref_count();

    _flag.get_subscriber().on_next(true);
    _flag.get_subscriber().on_completed();

    auto zz = flag.as_blocking().count();  // no deadlock, but zz == 0 instead of expected 1! what!

...and that's it. I totally expected to see 1 here.
By the way, the as_blocking here is only to simplify things. The original code was multi-threaded, and malfunctioned in the whereabouts of .replay() and I started simplifying and trying to find out why items are missing from the (completed, replayed) observable.

Anyways, this first dead-locking piece of code shows what I tried to achieve: raise a flag (on next), and keep it risen (replay), and let eveyone know it's finished changing (completed). I need the next because I want to use it for .take_until elsewhere to stop something when the flag is set. I need replay, because I dont know if the stoppable-thing is already running, or maybe it's going to attempt to start a few minutes later. I need completed, because elsewhere I want to collect and inspect all flags, ideally with concat. Merge would do as well, but it would make things easier for me if concat preserved the in-code order for me, as opposed to runtime-order produced by merge. Anyways, be it concat or merge, both need underlying sources to complete, or else concat/merge would keep waiting, and that's it.

I tried workaround from @kirkshoop in #412: .replay().take(3).ref_count() and it does not compile: the take() doesn't return a connectable, so ref_count() is not possible after take(). Bummer.

Let's try something similar, genetic programmic for the win!

Adding take - i.e. .replay().ref_count().take(1) or .take(1).replay().ref_count() - didn't change anything.
Adding limit to replay - .replay(5).ref_count() - didn't change anything.
Adding both - i.e. .replay(5).ref_count().take(1) or .take(1).replay(5).ref_count() - didn't change anything.

Then I tried rationalizing, "ok, I added ref_count(), but at the moment of sending and completing, there never were any subscribers yet, so purely theoretically replay() could still be disconnected*), and all the initial items could be flying into void**), and it did not deadlock only because the subject's internal observable remembered the completion", so let's try again, with explicit connect:

*) basing on http://reactivex.io/documentation/operators/refcount.html the ref_count subscribes immediatelly, but connects() actually only when it gets its own first subscriber
**) as we see later, it seems totally untrue and probably a totally opposite thing happen

    RXB::subject<bool> _flag;
    auto flag = _flag.get_observable().replay();
        auto sub = flag.connect(); // let's make sure it IS connected before we try next()

    _flag.get_subscriber().on_next(true);
    _flag.get_subscriber().on_completed();

    auto zz = flag.as_blocking().count();  // nope, still ZERO, at least no deadlock thanks to connect()

Finally I started doubting in .count(). I rarely use it. Maybe I don't know how.
At this point I was out of ideas, let's try observing everything!

    std::vector<bool> captured1; // raw observable, subscribed early
    std::vector<bool> captured2; // reply connected eraly and subscribed early
    std::vector<bool> captured3; // reply connected early and subscribed late but before completion
    std::vector<bool> captured4; // reply connected early and subscribed very late, after completion

    RXB::subject<bool> _flag;
    auto flag = _flag.get_observable();

    auto replayed = flag.replay();
    auto subs = replayed.connect();

    flag.subscribe([&captured1](auto item) { captured1.push_back(item); });
    replayed.subscribe([&captured2](auto item) { captured2.push_back(item); }); // subscribe BEFORE sending

    _flag.get_subscriber().on_next(true);

    replayed.subscribe([&captured3](auto item) { captured3.push_back(item); }); // subscribe AFTER sending

    _flag.get_subscriber().on_completed();

    replayed.subscribe([&captured4](auto item) { captured4.push_back(item); }); // try being totally late to the party

results:

I'd say it's pretty narrowed down to just two possibilities:

a) Replay seems to forget its cached items when underlying source stream completes b) Replay seems to forget its cached items when all current subscribers are gone well, or a+b.

Regarding B, it's not immediately seen in the code, but I guess the first two subscriptions actually do unsubscribe themselves when they get 'completed' event from Replay, and that leaves the Replay temporarily with no subscribers, until the late-to-the-party guy subscribes back, but it's to late for him.

I have no idea if that's by design, or a bug, but it looks like a bug to me. It's a "Replay" operator. I see no reason to forget what is meant to be replayed just because the source stream completed. If it's by-desing, please explain the rationale behind that, and please advise a solution other than "just dont complete it silly" ;) - I really need to concat/merge/etc the flags later.

All of this was tested on 4.1.0 from vcpkg.
Unfortunatelly, right now I can't test it on master or other versions.