ReactiveX / RxAndroid

RxJava bindings for Android
Apache License 2.0
19.89k stars 2.94k forks source link

Disposable is not disposing #379

Closed csshuai closed 6 years ago

csshuai commented 7 years ago
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
  private Disposable mDisposable;
  private Disposable mDisposable1;
  public void test() {
    Observable.create(new ObservableOnSubscribe<Boolean>() {
      @Override public void subscribe(@NonNull ObservableEmitter<Boolean> e) throws Exception {
        Log.d("test", "subscribe");
        e.onComplete();
      }
    }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Boolean>() {
      @Override public void onSubscribe(@NonNull Disposable d) {
        mDisposable = d;
      }

      @Override public void onNext(@NonNull Boolean aBoolean) {

      }

      @Override public void onError(@NonNull Throwable e) {
        Log.d("test", "onError");
      }

      @Override public void onComplete() {
        Log.d("test", "onComplete");
        getHandler().postDelayed(new Runnable() {
          @Override public void run() {
            Log.d("test1", "isDisposed: " + mDisposable.isDisposed());
          }
        }, 1000);
      }
    });
  }

  public void test1() {
    Observable.create(new ObservableOnSubscribe<Boolean>() {
      @Override public void subscribe(@NonNull ObservableEmitter<Boolean> e) throws Exception {
        Log.d("test1", "subscribe");
        e.onComplete();
      }
    }).subscribe(new Observer<Boolean>() {
      @Override public void onSubscribe(@NonNull Disposable d) {
        mDisposable1 = d;
      }

      @Override public void onNext(@NonNull Boolean aBoolean) {

      }

      @Override public void onError(@NonNull Throwable e) {
        Log.d("test1", "onError");
      }

      @Override public void onComplete() {
        Log.d("test1", "onComplete");
        getHandler().postDelayed(new Runnable() {
          @Override public void run() {
            Log.d("test1", "isDisposed: " + mDisposable1.isDisposed());
          }
        }, 1000);
      }
    });
  }
06-09 09:49:34.451 5433-5433/xxx.debug D/test: subscribe
06-09 09:49:34.458 5433-5433/xxx.debug D/test1: subscribe
06-09 09:49:34.458 5433-5433/xxx.debug D/test1: onComplete
06-09 09:49:34.477 5433-5433/xxx.debug D/test: onComplete
06-09 09:49:35.461 5433-5433/xxx.debug D/test1: isDisposed: true
06-09 09:49:35.478 5433-5433/xxx.debug D/test: isDisposed: false

Other

Can add a method in AndroidSchedulers?

/** A {@link Scheduler} which executes actions on {@code handler}. */
  public static Scheduler from(Handler handler) {
    if (handler == null) throw new NullPointerException("handler == null");
    return new HandlerScheduler(handler);
  }
JakeWharton commented 7 years ago

Does this reproduce with Schedulers.computation()?

akarnokd commented 7 years ago

There is an inherent race there because dispose() is called after onComplete() thus dependent on non-deterministic thread scheduling. If thread running create() is also on the main thread, you'd see isDisposed() true guaranteed in the follow-up task.

iviglz commented 7 years ago

private Disposable mDisposable; private Disposable mDisposable1; public void test() { Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { Log.d("test", "subscribe"); e.onComplete(); } }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer() { @Override public void onSubscribe(@NonNull Disposable d) { mDisposable = d; }

  @Override public void onNext(@NonNull Boolean aBoolean) {

  }

  @Override public void onError(@NonNull Throwable e) {
    Log.d("test", "onError");
  }

  @Override public void onComplete() {
    Log.d("test", "onComplete");
    getHandler().postDelayed(new Runnable() {
      @Override public void run() {
        Log.d("test1", "isDisposed: " + mDisposable.isDisposed());
      }
    }, 1000);
  }
});

}

public void test1() { Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { Log.d("test1", "subscribe"); e.onComplete(); } }).subscribe(new Observer() { @Override public void onSubscribe(@NonNull Disposable d) { mDisposable1 = d; }

  @Override public void onNext(@NonNull Boolean aBoolean) {

  }

  @Override public void onError(@NonNull Throwable e) {
    Log.d("test1", "onError");
  }

  @Override public void onComplete() {
    Log.d("test1", "onComplete");
    getHandler().postDelayed(new Runnable() {
      @Override public void run() {
        Log.d("test1", "isDisposed: " + mDisposable1.isDisposed());
      }
    }, 1000);
  }
});

} Duplicar de #366

leochuan commented 7 years ago

This issue is nothing to do with RxAndroid. Actually, if you observe on any other schedulers except Schedulers.trampoline() will lead to the same result. The onComplete method of ObservableEmitter looks like this:

static final class CreateEmitter<T> extends AtomicReference<Disposable> 
implements ObservableEmitter<T>, Disposable {
    ....
    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }
    ....
}

It will dispose itself after onComplete was called. And Observer.onSubscribe(CreateEmitter) was called before ObservableOnSubscribe.subscribe. If there is no other operation, CreateEmitter is the Disposable you get from your onSubscribe.

But when you chain your Observable with observeOn, the Disposable you get changed. It will return a new Observable to downstream and when the subscribe method was called it will make the origin Observable subscribing its own Observer.In this case it subscribes ObserveOnObserver:

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
    ...
    public void onSubscribe(Disposable s) {
        actual.onSubscribe(this);
    }
    ...
    @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        schedule();
    }
    ...
    boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
        ...
        if(empty) {
            if (e != null) {
                a.onError(e);
            } else {
                a.onComplete();
            }
            worker.dispose();
            return true;
        }
        ...
    }
}

When the onComplete was called the ObserveOnObserver does not dispose itself. But at now, the ObservableEmitter has been disposed. So the connection between upstream and downstream is cut down. Also by reading the source code you can find that if your manually dispose the Disposable it will dispose up to the first Disposable to make the ObservableEmitter stop emitting.

So I suppose the author intended to do so, cause there is really no need to change the state of all the Disposable downstream.