ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.88k stars 7.61k forks source link

2.x Proposal: disposeOn #4646

Closed ZacSweers closed 7 years ago

ZacSweers commented 8 years ago

Wanted to put this up for discussion:

An active concern in Android is binding execution to the lifecycle of something (Activity, View, etc). Disposable in RxJava 2 looks potentially useful for this if there could be a way to conditionally dispose against a given lifecycle (potentially represented as another Flowable). It would look very similar to takeUntil(), but with disposable semantics (not calling anymore events, whereas takeUntil() completes). I'm not sure where this would fit, but wanted to put it up for discussion as I'm sure there's RxJava 2 semantics I'm missing. My proposal would be something as a sibling to subscribeWith. Taking the returned Disposable is ok, but not ideal as it can quickly involve wrapping long chains and playing LISP with finding the closing parens later.

Ideally, we would be able to do something like this:

someFlowable
    ...
    .disposeOn(new DisposeOnObserver(lifecycle()) {
        // Override other methods as needed
    })

or if it's possible to keep it as an operator (though I'm guessing this is wishful thinking):

someFlowable
    ...
    .disposeOn(lifecycle())
    .subscribe(...)
JakeWharton commented 8 years ago

Why isn't this just a lifecycle-aware implementation of Observer? I don't see why RxJava needs to do anything.

ZacSweers commented 8 years ago

It could be for the first one, but the Observer would have to maintain an internal subscription to the lifecycle wouldn't it? It also requires you to use that observer implementation, which something like compose or the second solution above would help alleviate.

JakeWharton commented 8 years ago

For the second solution it seems like takeUntil would work.

.takeUntil(lifecycle())

or better yet be explicit

.takeUntil(stopped())
.takeUntil(detached())
ZacSweers commented 8 years ago

Yes, but I'm trying to avoid the completion event (which causes onError in Single and in general doesnt quite match up with the behavior of disposable). TakeUntil is how rxlifecycle works currently, but it used to have something like a SubscribeUntil that just unsubscribed downstream rather than completing. Iirc that wasn't very elegant/safe in rxjava 1, which is why I was bringing this up here in case Rxjava2/disposable made controlling downstream subscriptions easier.

akarnokd commented 8 years ago

dispose/cancel travels upstream and it is impossible to make it travel downstream. Disposing in the middle and not sending a terminal event downstream keeps it hanging and possibly leaking resources.

abersnaze commented 8 years ago

Compose it with never() static Observable method.

x.takeUntil(predicate).concatWith(never());

dalewking commented 8 years ago

It would seem one way to accomplish this is if Disposable had additional functionalities so that you can chain additional method calls to it.

The easiest approach would be to add a "to" method to Disposable like the other types have, so that you can apply a function to the disposable as in this signature:

 public final <R> R to(Function<? super Disposable, R> converter)

Then you could do:

someFlowable
    ...
    .subscribe(...)
    .to(lifecycle());

where lifecycle returns a function that accepts the Disposable and returns some type.

This way it is at the end of the chain so it only goes upstream.

Not sure "to" is the best name but it matches the method in Observable and friends. Or perhaps you do compose and it has to have a function that takes and returns a Disposable, but to is more general purpose.

dalewking commented 8 years ago

Seems like another way to go would be to use the "to" method of the Observable etc. In this case it will be enforced that only subscribe methods can be called after binding to the lifecycle. So you have something like this:

someFlowable
    ...
    .to(lifecycle())
    .subscribe(...);

In this case lifecyle returns some type that only has the subscribe methods from Observable etc.

You could accomplish this with no changes to RxJava

akarnokd commented 8 years ago

No additional methods to Disposable please.

ZacSweers commented 8 years ago

@abersnaze that doesn't seem to work, events still happen normally

akarnokd commented 7 years ago

I don't see how RxJava would work here either. There are several operators available to handle interruption of flows based on external reactive events:

// onComplete consumers
main.takeUntil(activityClosedStream());

// error consumers
CancellationException cancelled = new CancellationException();
main.takeUntil(activityClosedStream().map(v -> { throw cancelled; }));

// keep consumers in the dark
main.takeUntil(activityClosedStream().map(v -> { throw cancelled; }))
.onErrorResumeNext(e -> 
    e instanceof CancellationException ? Flowable.never() : Flowable.error(e));
ZacSweers commented 7 years ago

The never() example almost gets there, except that the consumer never unsubscribes in that case (unless I'm missing something). Ultimately the best case solution is something that automatically unsubscribes and doesn't require awkward wrapping of the chain or manual handling of the disposable.

dalewking commented 7 years ago

I think the only way to handle this is inserting the lifecycle component into a to method on the observable (single, etc). but instead of returning an observable only return an object that has the subscribe methods as I described above. This would force inserting the lifecycle at the end of the chain it could actually do a real unsubscribe.

The only limitation then from the RxJava side is that all the "to" methods take a Function which means you cannot have a single object instance that can be passed to Observable.to, Single.to, etc. I worked on a PR for fixing that but was not happy with it.

akarnokd commented 7 years ago

I think this problem should be solved outside of RxJava. We provide state peeking methods (+2 kinds in 2.0.1) and you may want to consider building a "framework" which does send/umap style calls:

lifecycle.with(Flowable.just(1).map(v -> field.getValue().doOnNext(s -> textBox.setText(s)));

lifecycle.events((Observable<AndroidEvents> oe) -> oe.map().filter()...);

Basically, you either give a complete flow to a lifecycle handler which will take care of subscribing and unsubscribing your sequence. Alternatively could provide a transformer which gets an established sequence and you transform it as you whish but return the reactive type which will be consumed and cancelled by the lifecycle handler.

ZacSweers commented 7 years ago

When you say cancelled, wouldn't that still involve sending the terminal event downstream per rxjava semantics since we can no longer unsubscribe downstream?

ZacSweers commented 7 years ago

I'm going to close this. I agree it's better solved outside of RxJava, so we're going to do that. Likely it'll be with some sort of bytecode weaving or just plain old fork with the methods we need added onto Subscription/Disposable, something that gets us what Kotlin's extension functions would let us do otherwise. Sending completions downstream makes Single/Completable nigh unusable due to the error, and we're explicitly trying to avoid having to manually capture the observable or its resulting subscription/disposable for API overhead reasons (trying to make it as frictionless as possible while easy to catch issues with lint).

What we're shooting to end up with is something like .subscribe().with(lifecycle).