ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.07k stars 396 forks source link

Memory Leak related to concurrency #132

Closed jpattersonz closed 9 years ago

jpattersonz commented 9 years ago

There is a fairly easy way to leak memory in rxcpp. It only occurs once you introduced concurrency.

Here are a few contrived examples which create a leak:

while (true)
{
    rxcpp::sources::interval(std::chrono::milliseconds(100))
        .subscribe_on(rx::observe_on_new_thread())
        .take_until(rx::sources::timer(std::chrono::milliseconds(1)))
        .as_blocking()
        .subscribe();
}

this will do it too:

while (true)
{
    rxcpp::sources::range(1, 1000000000)
        .subscribe_on(rx::observe_on_new_thread())
        .filter([](int x) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); return true; })
        .take_until(rx::sources::timer(std::chrono::milliseconds(1)))
        .as_blocking()
        .subscribe();
}

and this one:

while (true)
{
    auto pipeline =
        rxcpp::sources::range(1, 1000000000)
        .filter([](int x) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); return true; })
        ;

    auto subscription = pipeline.subscribe_on(rx::observe_on_new_thread()).publish().connect();

    std::this_thread::sleep_for(std::chrono::milliseconds(1));
    subscription.unsubscribe();
}

In my real world use case, I have my own source which produces multimedia samples and is implemented using the rxcpp scheduling abstractions (i.e. not just a loop, refer to issue #129). The examples above appear to be leaking an individual value in the pipeline (educated guess). In my case, my individual value is a multimedia sample and is much bigger than an int! So the memory leak is very noticeable.

Hopefully I've provided enough examples where someone much smarter than myself can chase down the cause.

Thanks,

Jeff

kirkshoop commented 9 years ago

Thanks for the report!

I do not think that rxcpp is leaking a value. I used this test to verify:

struct LeakDetector
{
    static int leaks;
    int id;
    ~LeakDetector(){
        std::cout << "~leak " << --leaks << ", id " << id << std::endl;
    }
    LeakDetector(const LeakDetector& o) : id(o.id) {
        std::cout << "+leak " << ++leaks << ", id " << id << std::endl;
    }
    LeakDetector& operator=(LeakDetector o) {
        id = o.id;
        std::cout << "=leak " << ++leaks << ", id " << id << std::endl;
        return *this;
    }
    LeakDetector(int id) : id(id) {
        std::cout << "ileak " << ++leaks << ", id " << id << std::endl;
    }
    std::array<int, 1000> d;
};
int LeakDetector::leaks = 0;

template<class Coordination>
auto make_exp(Coordination cn) -> rx::observable<LeakDetector> {
    return rxcpp::sources::interval(std::chrono::milliseconds(100))
        .map([](int id){return LeakDetector(id);})
        .subscribe_on(cn)
        .take_until(rx::sources::timer(std::chrono::milliseconds(1), cn))
        .as_dynamic();
};

SCENARIO("leaks", "[take][passes][operators]"){
    GIVEN("an async source"){

        WHEN("new thread is blocking subscribed"){

            for (int i = 0; i < 10; ++i) {
                make_exp(rx::observe_on_new_thread())
                    .as_blocking()
                    .subscribe();
            }

            THEN("the output is not leaks"){
                REQUIRE(LeakDetector::leaks == 0);
            }
        }
    }
}

The output reports:

Scenario: leaks
     Given: an async source
      When: new thread is blocking subscribed
      Then: the output is not leaks
/Users/kirk/source/rxcpp/Rx/v2/examples/tests/take.cpp:40

/Users/kirk/source/rxcpp/Rx/v2/examples/tests/take.cpp:52: 
PASSED:
  REQUIRE( LeakDetector::leaks == 0 )
with expansion:
  0 == 0

I will look for other leaks.

kirkshoop commented 9 years ago

I have seen leaks now when using the new_thread scheduler. I did switch the test code to the current_thread scheduler and saw the leaks disappear.

So now I am trying to find the leak. My first suspicion is that there is a circular reference in the shared pointers.

kirkshoop commented 9 years ago

Yep, there were some circular strong references. Now these are being broken by either switching to weak or manually breaking the loop at end of lifetime.

kirkshoop commented 9 years ago

Thanks for the report!

jpattersonz commented 9 years ago

Nice work! On Jun 18, 2015 11:49 PM, "Kirk Shoop" notifications@github.com wrote:

Yep, there were some circular strong references. Now these are being broken by either switching to weak or manually breaking the loop at end of lifetime.

— Reply to this email directly or view it on GitHub https://github.com/Reactive-Extensions/RxCpp/issues/132#issuecomment-113358556 .