ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.05k stars 395 forks source link

replay triggers std::terminate if it uses observe_on_new_thread coordinator and has two subscribers #465

Open javaJake opened 6 years ago

javaJake commented 6 years ago

Reproduction:

int main ()
{
    rxcpp::observable<int> data = rxcpp::observable<>::range<int>(1,4);
    rxcpp::connectable_observable<int> dataReplaying = data.replay(rxcpp::observe_on_new_thread());

    dataReplaying.subscribe([](int anInt){
        std::cout << "1: " << anInt << std::endl;
    });

    dataReplaying.subscribe([](int anInt){
        std::cout << "2: " << anInt << std::endl;
    });

    rxcpp::composite_subscription compositeSubscription;
    dataReplaying.connect(compositeSubscription);

    return EXIT_SUCCESS;
}

I expect this code to print as many numbers as it can before the compositeSubscription object destructs and causes all to unsubscribe. Instead, I get a failure on this line:

if (!keepAlive->q.empty()) std::terminate();