ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

Segfault when combine publish/repeat/zip #533

Open bornovalov opened 4 years ago

bornovalov commented 4 years ago
auto s = rxcpp::observable<>::just(1).publish();

rxcpp::observable<>::from(1,2,3,4)
    .zip(s.repeat())
    .subscribe([](std::tuple<int, int> const&){});

s.connect();

or

auto s = rxcpp::observable<>::just(1).publish();

rxcpp::observable<>::from(1,2,3,4)
    .merge(s.repeat().take(4))
    .subscribe([](int){});

s.connect();

v4.1.0

Without publish/connect this code works as expected.

In real life i have one shared resource s (as shared_ptr), and zip it with several task contexts. Maybe i doing something wrong, and this is expected behavior?

My environment:

▶ uname -a
Darwin macbook-4106 19.3.0 Darwin Kernel Version 19.3.0: Thu Jan  9 20:58:23 PST 2020; root:xnu-6153.81.5~1/RELEASE_X86_64 x86_64
▶ /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin/clang++ --version
Apple clang version 11.0.0 (clang-1100.0.33.12)
daixtrose commented 4 years ago

Do you have full code example that can be compiled?

bornovalov commented 4 years ago
#include <rxcpp/rx.hpp>

int main()
{
    auto s = rxcpp::observable<>::just(1).publish();

    rxcpp::observable<>::from(1,2,3,4)
        .zip(s.repeat())
        .subscribe([](std::tuple<int, int> const&){});

    s.connect();
    return 0;
}

and

#include <rxcpp/rx.hpp>

int main()
{
    auto s = rxcpp::observable<>::just(1).publish();

    rxcpp::observable<>::from(1,2,3,4)
        .merge(s.repeat().take(4))
        .subscribe([](int){});

    s.connect();
    return 0;
}
thorstink commented 4 years ago

Have you tried adding a coordination to your merge-operator? From the info you give I think that might be it.

(I didn’t test your code samples btw.. just chipping in)

It bit me once, see https://github.com/ReactiveX/RxCpp/issues/459.

kirkshoop commented 4 years ago

Hi,

This is causing a stack overflow in subscribe(). While this should not happen, there is a workaround.

make sure that the repeated subscription is subscribed on a coordination. s.subscribe_on(so).repeat()

Here are the examples with the changes needed to work.

int main()
{
    auto sc = rxcpp::rxsc::make_new_thread();
    auto so = rxcpp::synchronize_in_one_worker(sc);

    auto s = rxcpp::observable<>::just(1).publish();

    rxcpp::observable<>::from(so, 1,2,3,4)
        .zip(so, s.subscribe_on(so).repeat())
        .subscribe([](std::tuple<int, int> const&){});

    s.connect();
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return 0;
}

and

int main()
{
    auto sc = rxcpp::rxsc::make_new_thread();
    auto so = rxcpp::synchronize_in_one_worker(sc);

    auto s = rxcpp::observable<>::just(1).publish();

    rxcpp::observable<>::from(so, 1,2,3,4)
        .merge(so, s.subscribe_on(so).repeat().take(4))
        .subscribe([](int){});

    s.connect();
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return 0;
}