ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.88k stars 7.61k forks source link

Leak of Subscriber in case of long blocking execution after unsubscribe() #3148

Closed artem-zinnatullin closed 9 years ago

artem-zinnatullin commented 9 years ago

@nsk-mironov pointed me to the leak of the Subscriber in case of long blocking call inside of the Observable.

Problem is that Subscriber stores reference to the "child" Subscriber in final field and after unsubscribe() this reference is not nulled.

Imagine you're doing long blocking call in the Observable, for example, IO operation which can take seconds or even minutes. And before it completes you decide to unsubscribe from it (for example, user leaves the screen and we no longer want to keep references to this screen and leave it to the GC). Result — memory leak of the Subscriber which can also hold references to other huge objects such as screen (Activity, Fragment in case of Android) which will be alive until Observable will be completed.

Simple test that shows the problem:

@Test
public void shouldReleaseReferenceToTheSubscriberAfterUnsubscribe() {
    final CountDownLatch countDownLatch = new CountDownLatch(1);

    Subscriber<Object> subscriber = new TestSubscriber<Object>();

    Subscription subscription = Observable
            .create(new OnSubscribe<Object>() {
                @Override
                public void call(Subscriber<? super Object> subscriber) {
                    try {
                        // It could be anything: long network request, DB query, file read, anything
                        countDownLatch.await();
                        subscriber.onCompleted();
                    } catch (InterruptedException e) {
                        subscriber.onError(e);
                    }
                }
            })
            .subscribeOn(Schedulers.io())
            .subscribe(subscriber);

    subscription.unsubscribe();

    // Even after unsubscribe() there will be strong reference to the Subscriber
    // 1: Subscriber line 39 final field "subscriber"
    // 2: SafeSubscriber line 62 final field "actual"
    // 3: OperatorSubscribeOn and our OnSubscribe keeps strong reference to the 2, 2 also keeps reference additional reference to the 1

    countDownLatch.countDown();

    // Just to keep reference visible to the debugger after last real usage
    subscription.toString();
}

I can provide more information later, it's about 5 am, I need to sleep.

To see references to the object in the IntelliJ IDEA you need to debug the code, do right click on the object and select "Show Referring Objects", you will see something like this:

screen shot 2015-08-11 at 04 17 32
akarnokd commented 9 years ago

Holding it in a final reference is a tradeoff between overhead and retention. A final field has the lowest overhead and since it isn't volatile, the JIT can do more optimizations with the chain.

Most blocking I/O I know of can be interrupted or at least closed asynchronously. For example, if you open a file, you can wrap the close() call into a Subscription and add it to the Subscriber.

The test example might not run the Observable body at all, it really depends on how fast an io() thread can jump into life.

artem-zinnatullin commented 9 years ago

Holding it in a final reference is a tradeoff between overhead and retention. A final field has the lowest overhead and since it isn't volatile, the JIT can do more optimizations with the chain.

Yep, I understand this. We need to measure performance difference between final and non-final reference to the Subscriber in Subscriber.subscriber and SafeSubscriber.actual to make a decision.

Most blocking I/O I know of can be interrupted or at least closed asynchronously. For example, if you open a file, you can wrap the close() call into a Subscription and add it to the Subscriber.

It's not a solution for most usages of RxJava that we have at the moment in our projects and I guess, it's not a solution for other apps, teams and companies. For example: Application is doing slow network request wrapped into Observable and we don't want to interrupt it if user leaves the screen, we want to unsubscribe from it and keep its execution.

Also there are a lot of places where nobody does interruption of blocking operations wrapped into the Observable: 3rd-partly libraries that offers Rx support such as Retrofit, user-defined OnSubscribe and so on.

I see 3 possible solutions:

  1. Make Subscriber.subscriber and SafeSubscriber.actual non-final and null them if Subscriber unsubscribed, looks like it's best solution.
  2. Use WeakReference in Subscriber.subscriber and SafeSubscriber.actual, but this variant is slower than first.
  3. Actual "leakers" — implementations of Observable.OnSubscribe, we can store reference to the Subsriber as WeakReference but it's a solution because there a lot of operators, and custom OnSubscribe implementations in the user code.

The test example might not run the Observable body at all, it really depends on how fast an io() thread can jump into life.

This test example is not an actual test (it never fails, I'm not sure how to write test that fails if there are references to the Subscriber after unsubscribe()), it's just a sample code that can be easily debugged to see the problem.

akarnokd commented 9 years ago

I assume this problem comes up with Android development where you tend to leak the view or some other resource. You could apply your suggestions to the reference to such resources instead of the Subscriber.

artem-zinnatullin commented 9 years ago

@akarnokd yes, this problem is significant for Android Development and I guess, it's significant for any other client-side development with RxJava.

Typical usage of RxJava in Android projects:

void onSendTweet() {
  tweetsService
    .sendTweet("@akarnokd, let's fix memory leak in RxJava, pleeeeease")
    .subscribeOn(io())
    .observeOn(mainThread())
    .subscribe(result -> {
      someView.onTweetSent(); // We are leaking someView, and instance of the root class
    });
}

To avoid this leak we need to create static class with WeakReference for the Subscriber each time we want to do something with RxJava, it's crazy :(

void onSendTweet() {
  tweetsService
    .sendTweet("@akarnokd, let's fix memory leak in RxJava, pleeeeease")
    .subscribeOn(io())
    .observeOn(mainThread())
    .subscribe(new SendTweetSubscriber(this));
}

static class SendTweetSubscriber extends Subscriber<Result> {
  private final WeakReference<MyView> myViewWeakReference;

  SendTweetSubscriber(MyView myView) {
    myViewWeakReference = new WeakReference<>(myView);
  }

  @Override public void onCompleted() {
    final MyView myView = myViewWeakReference.get();

    if (myView != null) {
      myView.someView.onTweetSent();
    }
  }
}

A lot of boilerplate code required each time to just subscribe to the operation.

I understand, that performance is major part of RxJava, but let's meause difference in the performance between final and non-final reference to the Subscriber first?

artem-zinnatullin commented 9 years ago

Alternative solution — WeakSubscriber which stores reference to the original Subscriber in the WeakReference, I guess it can be part of RxJava.

nsk-mironov commented 9 years ago

@artem-zinnatullin WeakSubscriber is a bad idea. The original Subscriber is an anonymous object in most cases and it will be almost instantly collected by gc.

nsk-mironov commented 9 years ago

The issue can be solved by using custom operator and actually, I've already implemented one long time ago https://github.com/ReactiveX/RxAndroid/pull/13

artem-zinnatullin commented 9 years ago

@nsk-mironov damn, you're right.

dlew commented 9 years ago

This is a lot of text to parse through, so I'm going to try to simplify (and see if my understanding is correct):

  1. You have a long-running Subscription created within an Activity.
  2. You want the Subscription to clear its references to the Activity after it dies (to avoid memory leaks).
  3. You want the Subscription to live on past the death (to avoid redoing work).

As an alternative to clearing out the Subscription, why not use an Observable that can be resubscribed to? E.g., use cache(). Then each Activity using the Observable can handle its own references (and properly unsubscribe/null out references after death).

JakeWharton commented 9 years ago

As an alternative to clearing out the Subscription, why not use an Observable that can be resubscribed to? E.g., use cache(). Then each Activity using the Observable can handle its own references (and properly unsubscribe/null out references after death).

I've also done this with subjects in the past which were owned by components that live outside of the view/activity lifecycle. Even using cache() is going to require some location independent of lifecycle from which the observable can be shared.

konmik commented 9 years ago

I don't think that we should call this a leak. This is a known behaviour, we can just document it.

konmik commented 9 years ago

Here is another thought: if we will nullify the reference then we will lose the ability to resubscribe a second time because the chain is broken now.

AllenVork commented 6 years ago

Now, a lot of projects are facing memory leaks by using Rxjava due to the final references. Some people suggest to use clear method instead of unsubscribe and some others sugguest to null out the subscription. Can anybody give me a efficient answer?

akarnokd commented 6 years ago

Depends on how you store the references. If in CompositeSubscription, clear it. If in a local Subscription field, null it after calling unsubscribe().

AllenVork commented 6 years ago

I'm using CompositeSubscription with clear method. But still gets memory leak.

akarnokd commented 6 years ago

Can you trace the source of the leak with LeakCanary or similar tools?