ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.05k stars 395 forks source link

interval (with coordination) combined with flat_map and range, doesn't complete #416

Closed nikobarli closed 6 years ago

nikobarli commented 6 years ago

The following code only prints '42' once instead of two, and never completes.

        auto coordination = rxcpp::observe_on_new_thread();
        auto src1 = rxcpp::observable<>::interval(std::chrono::seconds(1), coordination);
        auto src2 = rxcpp::observable<>::range(42, 42);

        auto obs = src1.take(2).flat_map([src2](auto && v) { return src2; });

        obs.as_blocking().subscribe([](auto && val) { std::cout << val << std::endl; });

If I remove 'coordination' from 'src1' then two '42' are printed and the subscription completes.

kirkshoop commented 6 years ago

Thanks for the report!

this looks like a lifetime bug. when the second 42 is delivered to take() it should unsubscribe() the source and pass 42 to the flat_map then signal completed on the flat_map. flat_map should subscribe the range when it receives the 42.

ah.

try to pass the coordinator to both interval and range and see if that is a workaround.

If that works then the problem is that the worker that own the current thread when range is subscribed has already been cancelled when the range is subscribed so when it schedules the 42 to be delivered that is ignored.

need to think about that.

NOTE: this workaround is not good since each range subscribe will create a new thread. a better workaround would be to use synchronize_in_one_worker() to share the same thread between the interval and all the range.