ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

Memory leak using repeat-operator on cold observable #538

Open pem-flr opened 4 years ago

pem-flr commented 4 years ago

The code shown below generates a memory leak. Any ideas what might be wrong here?

using namespace std::chrono_literals;
    namespace rxop = ::rxcpp::operators;

    auto rotatingCounter = rxcpp::observable<>::range(0, 10)
       | rxop::repeat();

    auto trigger = rxcpp::observable<>::interval(20ms);

    auto triggerWithCounter = trigger | rxop::zip(rotatingCounter);

    (triggerWithCounter | rxop::take(10000))
        .subscribe([](std::tuple<long, int> arg){
            std::cerr << 20 * std::get<0>(arg) << "ms, " << std::get<1>(arg) << std::endl;
        });

Edit: I've done some research on this and it looks like a classical backpressure case to me. The zip operator's buffer keeps growing as the rotatingCounter is repeated over and over. I found this thread over at SO where somebody created an own operator for this but it doesn't look that elegant and the massive overhead is IMHO not proportional to the simple function of the example above. If anybody got an idea how to solve this in an elegant way using RxCpp, I will be thankful for a proposal.