ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

Memory leak in use of switch_on_next #535

Closed daixtrose closed 4 years ago

daixtrose commented 4 years ago
#include <catch2/catch.hpp>

#include <rxcpp/rx.hpp>

#include <chrono>
#include <future>
#include <iostream>
#include <mutex>
#include <tuple>

namespace utility {

class StayTrueForAWhile {
    std::unique_ptr<std::mutex> mtx_;
    std::future<void> returnInAWhile_;
    mutable std::future_status status_;

public:
    template <typename Rep, typename Period>
    StayTrueForAWhile(std::chrono::duration<Rep, Period> const& duration)
        : mtx_{ std::make_unique<std::mutex>() }
        , returnInAWhile_{ std::async(std::launch::async, [=]() {
            std::this_thread::sleep_for(duration);
        }) } {}

    StayTrueForAWhile(StayTrueForAWhile const&) = delete;
    StayTrueForAWhile(StayTrueForAWhile&&) = default;
    StayTrueForAWhile& operator=(StayTrueForAWhile const&) = delete;
    StayTrueForAWhile& operator=(StayTrueForAWhile&&) = default;

    operator bool() const {
        std::lock_guard<std::mutex> const lock{ *mtx_ };
        constexpr auto minimumWaitTime = std::chrono::milliseconds(10);
        status_ = returnInAWhile_.wait_for(minimumWaitTime);
        return (status_ != std::future_status::ready);
    }
};

template <typename Rep, typename Period>
auto trueForATimeSpanOf(std::chrono::duration<Rep, Period> const& duration) {
    StayTrueForAWhile result{ duration };
    return std::move(result);
}

} // namespace utility

using Msg = int;

class C {
public:
    static constexpr Msg msg = 42;

private:
    rxcpp::subjects::subject<Msg> subject1_;
    rxcpp::observable<Msg> obs1_;
    rxcpp::composite_subscription subscription_;

public:
    C()
        : subject1_{}
        , obs1_{ subject1_.get_observable() } {
        setupSubscription();
    }

    ~C() { subscription_.unsubscribe(); }

    void fireS1(Msg msg) { subject1_.get_subscriber().on_next(msg); }

private:
    void setupSubscription() {
        namespace rxop = ::rxcpp::operators;
        using namespace std::chrono_literals;

        // auto so = rxcpp::observe_on_event_loop();
        auto sc = rxcpp::rxsc::make_new_thread();
        auto so = rxcpp::synchronize_in_one_worker(sc);

        auto emitWhenSilent = [&](auto& sequence,
                                  auto waitTimespan,
                                  auto intervalTimespan,
                                  auto emitterValue) {
            struct Empty {};
            static const Empty empty;
            return sequence | rxop::map([=](auto) { return empty; })
                | rxop::start_with(empty) // ensure time is triggered once
                | rxop::map([=](auto /*ignored */) {
                       auto start
                           = std::chrono::steady_clock::now() + waitTimespan;
                       return rxcpp::observable<>::interval(
                                  start, intervalTimespan, so)
                           | rxop::as_dynamic();
                   })
                | rxop::switch_on_next(so)
                | rxop::map([=](auto) { return emitterValue; });
        };

        auto e = obs1_ | rxop::observe_on(so);

        auto s = emitWhenSilent(e, 30ms, 10ms, msg);
        subscription_ = s.subscribe([](auto) { std::cerr << "--> TAP\n"; });

        std::cerr << "ESCAPED!\n";
    }
};

TEST_CASE("Reproduce memory leak", "[RxCpp]") {

    using namespace std::chrono_literals;

    C c{};

    auto goAhead = utility::trueForATimeSpanOf(2s);

    std::thread t1{ [&]() {
        while (goAhead) {
            c.fireS1(C::msg);
            std::this_thread::sleep_for(200ms);
        }
        std::cerr << "================== T1 terminated "
                     "==================\n";
    } };

    t1.join();
}

compile with g++-9 (Ubuntu 9.2.1-17ubuntu1~18.04.1) 9.2.1 20191102

/usr/bin/g++-9 -I/dependencies/catch2-src/single_include \
    -I/dependencies/rxcpp-src/Rx/v2/src -g   \
    -pedantic -fno-omit-frame-pointer -Wuninitialized \
    -Wunused-parameter -Wno-psabi -fPIC -fsanitize=address \
    -std=gnu++2a -o this_file.cpp.o -c this_file.cpp

Let the test run several times, at some point in time you will obtain a leak detection, see output.txt

kirkshoop commented 4 years ago

Hi Markus!

I was able to repro this with the code you gave.

The short answer is that this is not a memory leak. The thread join still leaves work running in rxcpp. That work will complete and release the resources in time (I tried adding a sleep_for(5s) and did not see asan failures)

The long answer is that I have been trying to find a way to compose the rxcpp signals to get a reliable end-of-work signal. I have not been successful yet.

The longer answer is that we are doing a lot of work trying to define structured concurrency to eliminate these issues in lib unifex. operations will have a way to signal that the state has been torn down.

daixtrose commented 4 years ago

I can confirm that a release version running for several minutes does not change its memory footprint. So this issue can be closed, but it will enter my collection of things that nearly killed me.

kirkshoop commented 4 years ago

Sorry about that, thanks for confirming. And sorry about the pain.