ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.07k stars 396 forks source link

Take causes memory leak #452

Open simonvpe opened 6 years ago

simonvpe commented 6 years ago

There is a circular dependency here. I'll try to figure out how to resolve it myself I presume. It would be a good idea to run the tests with valgrind or sanitizers though, since a lot of issues are about memory leaks.

    template<class Subscriber>
    void on_subscribe(const Subscriber& s) const {

        typedef Subscriber output_type;
        struct state_type
            : public std::enable_shared_from_this<state_type>
            , public values
        {
            state_type(const values& i, const output_type& oarg)
                : values(i)
                , mode_value(mode::taking)
                , out(oarg)
            {
            }
            typename mode::type mode_value;
            output_type out;
        };
        // take a copy of the values for each subscription
        auto state = std::make_shared<state_type>(initial, s);

        composite_subscription source_lifetime;

        s.add(source_lifetime);

        state->source.subscribe(
        // split subscription lifetime
            source_lifetime,
        // on_next
            [state, source_lifetime](T t) {
                if (state->mode_value < mode::triggered) {
                    if (--state->count > 0) {
                        state->out.on_next(t);
                    } else {
                        state->mode_value = mode::triggered;
                        state->out.on_next(t);
                        // must shutdown source before signaling completion
                        source_lifetime.unsubscribe();
                        state->out.on_completed();
                    }
                }
            },
        // on_error
            [state](std::exception_ptr e) {
                state->mode_value = mode::errored;
                state->out.on_error(e);
            },
        // on_completed
            [state]() {
                state->mode_value = mode::stopped;
                state->out.on_completed();
            }
        );
    }
kirkshoop commented 6 years ago

Yes, there are many circular references in rxcpp. These are broken by the unsubscribe signal. Is there an expression that actually demonstrates a leak?

simonvpe commented 6 years ago

After producing a MCVE I'm not sure it is take that is the problem anymore.

#include <rxcpp/rx.hpp>
#include <iostream>

struct foo : public rxcpp::sources::source_base<int> {
  static int refcount;

  explicit foo() { std::cout << "++refcount = " << ++refcount << '\n'; }
  ~foo() { std::cout << "--refcount = " << --refcount << '\n'; }
  foo(const foo &other) { std::cout << "++refcount = " << ++refcount << '\n'; }
  foo(foo &&) { std::cout << "++refcount = " << ++refcount << '\n'; }
  foo &operator=(foo &&) = default;
  foo &operator=(const foo &) = default;

  template <typename Subscriber>
  void on_subscribe(Subscriber subscriber) const {
    for (auto i = 0; i < 100; ++i) {
      subscriber.on_next(i);
    }
    subscriber.on_completed();
  }
};

auto create_foo_observable() {
  return rxcpp::observable<int, foo>{foo{}}
      .lift<int>([](rxcpp::subscriber<int> subscriber) {
        subscriber.get_subscription().add([] {
          std::cout << "Unsubscribed (refcount = " << foo::refcount << ")\n";
        });
        return subscriber;
      })
      .as_dynamic();
}

int foo::refcount = 0;

int main() {
  {
    create_foo_observable()
        .observe_on(rxcpp::observe_on_new_thread())
        .take(10)
        .as_blocking()
        .subscribe([](auto) {});
  }
  std::cout << "Final refcount is " << foo::refcount << '\n';
}
++refcount = 1
++refcount = 2
++refcount = 3
++refcount = 4
++refcount = 5
--refcount = 4
--refcount = 3
++refcount = 4
++refcount = 5
++refcount = 6
++refcount = 7
--refcount = 6
--refcount = 5
--refcount = 4
--refcount = 3
--refcount = 2
--refcount = 1
Unsubscribed (refcount = 1)
Final refcount is 1

If I remove .observe_on(rxcpp::observe_on_new_thread()) the final refcount is 0. Maybe this is not how you are supposed to create your own observables? In my real code I start up an async process using boost::asio that calls the methods on the subscriber, but the general boilerplate is about the same.

simonvpe commented 6 years ago

If I use rxcpp::observable<>::create() the result is more pleasing however.

#include <rxcpp/rx.hpp>

struct bar {
  static int refcount;

  explicit bar() { std::cout << "++refcount = " << ++refcount << '\n'; }
  ~bar() { std::cout << "--refcount = " << --refcount << '\n'; }
  bar(const bar &other) { std::cout << "++refcount = " << ++refcount << '\n'; }
  bar(bar &&) { std::cout << "++refcount = " << ++refcount << '\n'; }
  bar &operator=(bar &&) = default;
  bar &operator=(const bar &) = default;
};

int bar::refcount = 0;

int main() {
  {
    rxcpp::observable<>::create<int>([state = bar{}](auto subscriber) {
      for (auto i = 0; i < 100; ++i) {
        subscriber.on_next(i);
      }
      subscriber.on_completed();
    })
        .observe_on(rxcpp::observe_on_new_thread())
        .take(10)
        .as_blocking()
        .subscribe([](auto) {});
  }

  // <- Need to wait for cleanup or `bar::refcount == 1`
  using namespace std::chrono_literals;
  std::this_thread::sleep_for(100ms);

  std::cout << "Final refcount is " << bar::refcount << '\n';
}
++refcount = 1
++refcount = 2
++refcount = 3
++refcount = 4
++refcount = 5
--refcount = 4
--refcount = 3
--refcount = 2
++refcount = 3
++refcount = 4
++refcount = 5
--refcount = 4
--refcount = 3
++refcount = 4
++refcount = 5
++refcount = 6
--refcount = 5
++refcount = 6
--refcount = 5
--refcount = 4
++refcount = 5
++refcount = 6
--refcount = 5
++refcount = 6
--refcount = 5
--refcount = 4
--refcount = 3
--refcount = 2
--refcount = 1
--refcount = 0
Final refcount is 0
kirkshoop commented 6 years ago

I expect the the first will get to zero if the sleep in the second is added before printing the final refcount.

The refcount waits for the observe_on to shutdown.