ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

zip(): does not complete if one of the observables does not complete #496

Open dbagaev opened 5 years ago

dbagaev commented 5 years ago

We use zip() to ensure all the data sent to no-rx processing compoenent has been processed. But since no-rx processing does not have on_complete(), zip never finished.

If I draw the diagram, we have this situation without on_complete()::

1-2-3|
----A---B---C--
    |   |   |
----1A--2B--3C-

Probably this is done by design, but here is some inconsistency with the implementation, because if you then get an extra element from the later stream then zip() sends on_complete() anyway, here is a diagram:

1-2-3|
----A---B---C---D-
    |   |   |   |
----1A--2B--3C--|

From my point of view problem could be in the code which processes on_complete handler, it waits until all sources are competed (rx-zip.hpp:211):

auto& completed = std::get<Index>(state->pending).completed;
completed = if (--state->pendingCompletions == 0) {
    state->out.on_completed();
}

At other hand on_next() handler can complete the stream if any of the sources is compelete (rx-zip.hpp:201):

if (rxu::apply_to_each(state->pending, source_completed_values_empty(), rxu::any_value_true())) {
    state->out.on_completed();
}

I think that those implementation are not consistent and actually they should be aligned in the way that on_comleted() on any source should send out all completed tuples of values and then call on_compeletd() of the zip() itself instead of waiting for completion of other streams. Or vice versa on_next() should not send on_complete() if any of other sources are completed.

This issue is related to #236. Probably, there is a logic behind this behaviour which I don't understand, but then I would be happy to get an explanation. Thank you in advance!

quetzalcoatl commented 4 years ago

That just bit me as well.

My case was, I've had several observables and I wanted to detect when any of them completes. Items were irrelevant, ideally without any items, just complete when any of N source completes. A problem similar to the amb/race operator, but this one requires at least 1 item sent over the line...

After reviewing available operators, ZIP looked ideal for this: it requires an element from each source, so I naturally assumed that ZIP operator terminates the stream as soon as any source completes. By definition, there's no chance of emitting any further elements if any of the sources completed and won't provide anything further.

However, it doesn't seem to be happening:

TEST(RXX, Zip_Completes_WhenBothSources_Complete)
{
    RXB::subject<std::string> subject1;
    RXB::subject<std::string> subject2;
    RXX::observable observable = subject1.get_observable().zip(subject2.get_observable());

    bool finished = false;
    observable.subscribe(
        [](auto item) {},
        [](std::exception_ptr e) { GTEST_FAIL("unexpected exception"); },
        [&finished]() { finished = true; }
    );

    subject1.get_subscriber().on_completed();
    subject2.get_subscriber().on_completed();

    EXPECT_TRUE(finished);  // PASS!
}

TEST(RXX, Zip_Completes_WhenJustOneSource_Completes)
{
    RXB::subject<std::string> subject1;
    RXB::subject<std::string> subject2;
    RXX::observable observable = subject1.get_observable().zip(subject2.get_observable());

    bool finished = false;
    observable.subscribe(
        [](auto item) {},
        [](std::exception_ptr e) { GTEST_FAIL("unexpected exception"); },
        [&finished]() { finished = true; }
    );

    //subject1.get_subscriber().on_completed(); // doesn't matter which ONE you uncomment, still fails
    //subject2.get_subscriber().on_completed(); // doesn't matter which ONE you uncomment, still fails

    EXPECT_TRUE(finished); // FAIL!
}

That surprised me!
But then, aside from items, and completion, we also have... ...errors:

TEST(RXX, Zip_Fails_WhenFirstSource_Completes_AndSecond_Fails)
{
    RXB::subject<std::string> subject1;
    RXB::subject<std::string> subject2;
    RXX::observable observable = subject1.get_observable().zip(subject2.get_observable());

    bool failed = false;
    bool finished = false;
    observable.subscribe(
        [](auto item) { GTEST_FAIL("unexpected item"); },
        [&failed](std::exception_ptr e) { failed = true; },
        [&finished]() { finished = true; }
    );

    subject1.get_subscriber().on_completed();

    subject2.get_subscriber().on_error(std::exception_ptr());

    EXPECT_FALSE(finished);   // PASS!
    EXPECT_TRUE(failed);    // PASS!
}

From the errors' point of view: it behaved fine. One source has completed, but we still could have errors arriving through the other lane, so it waits until that other lane completes. At this moment, we're sure that there are both no more items, and also no more errors, so ZIP's output can complete...

Similarly, the "inconsistent" case observed by dbagaev:

TEST(RXX, Zip_Fails_WhenFirstSource_Completes_AndSecond_Emits)
{
    RXB::subject<std::string> subject1;
    RXB::subject<std::string> subject2;
    RXX::observable observable = subject1.get_observable().zip(subject2.get_observable());

    bool failed = false;
    bool finished = false;
    observable.subscribe(
        [](auto item) { GTEST_FAIL("unexpected item"); },
        [&failed](std::exception_ptr e) { failed = true; },
        [&finished]() { finished = true; }
    );

    subject1.get_subscriber().on_completed();
    subject2.get_subscriber().on_next("11");

    EXPECT_TRUE(finished); // PASS!
}

Here, the first source completed, but ZIP did not. ZIP still waits for an error that could be arriving from the other source. However, the other source emitted an item. This means that at the "point of time" when first source completed, there was no error on source2: source2 provided an item, right? So no error. So ZIP completes.

From my point of view, this looks like a very careful, and correct, design. ZIP is all about observing position-synchronized items from sources. So what I/we should be looking at is the behavior observed from the position of a 'state' of such tuple:

format: input+input -> output

item + (waiting) -> (waiting)          - obvious
item + item -> (item,item)             - obvious
item + error -> error                  - obvious
end + end -> end                       - obvious
end + (waiting) -> (waiting)       - our gotcha here, we expected END not WAIT
end + error -> error                   - we didn't consider that!

Consider that last three lines describing how 'end+' behaves:

Now it looks like a design desicion regarding error-handling.


OK, if it's not a bug/overlook (I still kinda "feel" that ZIP should complete asap) in ZIP and if that's by design, then ... do we have any operator that allows to detect completion of N sources fast? zip does not, combine latest does not, amb/race does not.. ooooh.. the RACE..

amb/race picks a stream depending on which stream emits first. But it needs an item to be emitted. If stream just completes, amb/race just ignores it and wait for items from other streams. That's why I couldn't use this operator for detecting which stream completes first.. but I can patch it with concat! I can easily convert a completing-stream in to a one-element-stream by concating it with a constant single dummy item.

TEST(RXX, AmbConcatDummy_Completes_WhenJustOneSource_Completes)
{
    RXB::subject<std::string> subject1;
    RXB::subject<std::string> subject2;
    RXX::observable observable =
        subject1.get_observable().concat(RXS::just(std::string{}))
        .amb(subject2.get_observable().concat(RXS::just(std::string{})))
        .ignore_elements(); // I'm really NOT interested in items, just the completion

    bool finished = false;
    observable.subscribe(
        [](auto item) { GTEST_FAIL("unexpected item"); },
        [](std::exception_ptr e) { GTEST_FAIL("unexpected exception"); },
        [&finished]() { finished = true; }
    );

    //subject1.get_subscriber().on_completed(); // doesn't matter which ONE you uncomment
    //subject2.get_subscriber().on_completed(); // doesn't matter which ONE you uncomment

    EXPECT_TRUE(finished); // PASS!
}

and actually, since my original problem was take_until, I can actually drop ignore-elements:

    source
        .take_until(
            sourceToWatch1.concat(just(dummy))
            .amb(sourceToWatch2.concat(just(dummy)))
            .amb(sourceToWatch3.concat(just(dummy)))
            ....
        )
       ...

and now my problem is solved, I can detect end-of-stream without waiting for other streams, and I understand AMB and ZIP a bit better, and probably I've just almost wrote a tutorial piece, feel free to use anything of it in the docs if you find it useful.

But dear all, we could really use an merge/zip/race/amb-like operator that would just detect completion :)