Closed zenparsing closed 9 years ago
An example where using cancel
would be appropriate:
// Flatten a nested observable Observable<Observable<T>> to Observable<T>, which
// completes when an inner observable completes without being superseded by
// a subsequent inner observable.
function switchLatest2(observable) {
return new Observable(sink => {
let inner = null;
let outer = observable.subscribe(prime(function*() {
try {
while (true) {
let value = yield;
if (inner)
inner.cancel();
inner = value.subscribe(prime(function*() {
try {
while (true)
sink.next(yield);
} catch (x) {
sink.throw(x);
} finally {
sink.return();
}
}));
}
}
catch (x) { sink.throw(x) }
finally { sink.return() }
}));
return _=> {
if (inner)
inner.cancel();
outer.cancel();
};
});
}
@jhusain @blesh Thoughts?
So my concerns with both this and the unsubscribe
calling return
or dispose
implementation is that it seems like we're adding strange rules to support duality.
To my mind there is a producer (the source in the observable) and a consumer (the entity that owns the subscription object). When it comes to terminal events for the observable, the producer has a need to notify of success or error, but nothing else. The consumer may just want to cancel/dispose and walk away, but has no business telling the producer that it's completed either successfully or unsuccessfully. So it seems like these behaviors are unidirectional. The consumer, after all, is the original creator of the observer and it's handlers, so why would it need to call something that automatically invokes those handlers in some way, when it could invoke them anytime it wants?
While it might be useful to enable the consumer to notify the producer that it would like the producer to complete successfully. I'm not sure in what cases that would be true. If this is really the case, why not add next
on the subscriber and complete the transformation from subscriber to observer, which is something I queried @jhusain with last week.
The interesting thing from an implementor's perspective is if return
, throw
, cancel/dispose
and next
are on subscriber, it becomes the same as the "safe" Observer that is used internally to the library. The safe observer being a wrapped version of the observer that is passed to your subscribe
or @@observer
methods that makes some guarantees about things like not being able to next
after completion, etc.
So... the implementation becomes easier, which I like... but that's not a good reason to go with this design.
One of the better things about a design with a simple Subscription object or function is it becomes easy to create compositional abstractions over that type. You can do things like create a "composite subscription" that has the same interface, but enables one to cancel multiple subscriptions simultaneously. Or more importantly for async, perhaps the serial subscription.
If the subscription suddenly had 3 methods, how would that compose in those scenarios? It might not be something to worry about in a native type, I suppose, but that doesn't mean it's not something to consider when the native type is something so extensible.
It seems to me like current ES6 generators themselves aren't complete in the realm of completion/termination. If you have a generator that is using some scarce resource under the hood, there is no clean way (AFAIK) to simply finalize that generator and clean up the scarce resource without either calling return
or throw
on it's iterator. So the subscriber in that case is telling the generator it's "successful" or "in error", but only because it can't simply abort and clean up. It's a really weird thing. So you're forced to actually call either next, return or throw and analyze what was sent from the consumer and trigger clean up.
function* weirdScience() {
var scarceResource;
try {
scarceResource = new ScarceResource();
while(true) {
yield scarceResource.getStuff();
}
} catch (err) {
// if the consumer calls `iterator.throw("abort");` just clean up
if(err !== 'abort') {
throw err; // actual error!
}
} finally {
scarceResource.cleanUp();
}
}
var g = weirdScience();
console.log(g.next());
// nah, abort this thing
g.throw('abort');
What's more interesting about the above example is that if you remove the console.log(g.next());
, at least in Babel, the catch
block is never hit, it just throws the passed to the throw
method as unhandled. ... and I doubt that's something we'd want to account for in duality? I dunno.
...
And I just noticed it's 1:15AM, so my brain dump above may be incoherent. I'll look at this more in the morning. :) haha.
@blesh while I generally follow your argument and find it pretty reasonable, I just wanted to give your 1:15am brain some help on generators. You don't want the catch block; just finally
.
function* weirdScience() {
var scarceResource;
try {
scarceResource = new ScarceResource();
while(true) {
yield scarceResource.getStuff();
}
} finally {
scarceResource.cleanUp();
}
}
var g = weirdScience();
console.log(g.next());
// If you want to cancel, without any error getting thrown, use return:
g.return();
// If you want to abort, throwing an error, use throw:
g.throw(new Error('abort abort!'));
@zenparsing
So to be perfectly clear - there are now three ways to cancel an observer from the producer and neither of them are sound (for example if an observable has multiple subscribers one can force cancel it for the others)?
(Don't have an opinion yet, trying to understand)
As for switchLatest2
would you mind explaining why this is better than calling return
on the inner observable?
@blesh
The consumer may just want to cancel/dispose and walk away, but has no business telling the producer that it's completed either successfully or unsuccessfully.
This is my intuition and my experience with Rx too. It's also the only cancellation outlook I find easy to reason about.
The consumer, after all, is the original creator of the observer and it's handlers, so why would it need to call something that automatically invokes those handlers in some way, when it could invoke them anytime it wants?
It's possible that you'd get a subscription on but not create the observer yourself (a la fromEvent
). Would this be a good example of when it'd be beneficial.
One of the better things about a design with a simple Subscription object or function is it becomes easy to create compositional abstractions over that type. You can do things like create a "composite subscription" that has the same interface, but enables one to cancel multiple subscriptions simultaneously. Or more importantly for async, perhaps the serial subscription.
I want to point out that in promises (yes yes, but bear with me) wrapping things is typically simpler (much smaller API) and still very few people can write compositions well. Implementing Promise.all
or Promise.race
(and most of the harder aggregates) is less than 10 LoC but still very few people can write them correctly in my experience. Having a surface area of 3 methods on the subscription (and the complexity of using observables correctly anyway) sounds like it'd make writing these functions correctly hard for users. So overall - very good point.
It seems to me like current ES6 generators themselves aren't complete in the realm of completion/termination. If you have a generator that is using some scarce resource under the hood, there is no clean way (AFAIK) to simply finalize that generator and clean up the scarce resource without either calling return or throw on it's iterator.
Correct, this is my understanding too. You either pass it a token through next
it has to check for ("poison pilling") or you call .return
on it. Note that .return
on the generator interface doesn't have to actually call return
on the generator. For example one approach we take at bluebird is to reference count the subscribers and only call return
on the generator when all the subscribers are no longer interested in the subscription (that is, called cancel
).
So the subscriber in that case is telling the generator it's "successful" or "in error", but only because it can't simply abort and clean up. It's a really weird thing. So you're forced to actually call either next, return or throw and analyze what was sent from the consumer and trigger clean up.
Yes, I agree it's weird - reiterating, you don't have to actually call .return
on the generator (like this proposal says IIUC) if return
is called on the subscriber. It's just an interface :)
... and I doubt that's something we'd want to account for in duality? I dunno.
Yes, I opened https://github.com/zenparsing/es-observable/issues/33 for the more general issue.
Very well written analysis btw, I generally agree.
@blesh
The consumer, after all, is the original creator of the observer and it's handlers, so why would it need to call something that automatically invokes those handlers in some way, when it could invoke them anytime it wants?
Yes - I completely understand this point of view. But...
Let's take it for granted that we want to allow the use of generator-function generators as observers (and we should). Then the observer can be a state machine which might need to be cleaned up upon cancellation (i.e. we might want the finally blocks of the generator function to execute). Furthermore, implementation using generator functions creates a distinction between the "subscriber" and the "observer". In my mind, this forces us to consider three separate roles:
Let's take the RxJS approach and collapse roles 2 and 3 (so that there is no separate "subscriber"). If I want to implement a generator-function observer which cleans up on cancellation, and I want to unsubscribe, I have to do this:
let g;
let sub = observable.subscribe(g = prime(function*() {
try {
// A loop with yield
} finally {
// Cleanup
}
}));
// Later...
sub.unsubscribe();
g.return();
Which is really horrid.
The point of this strawman is that it allows the subscriber to determine what should happen from the observer's point of view on cancellation. We can simply write the previous:
let sub = observable.subscribe(prime(function*() {
try {
// A loop with yield
} finally {
// Cleanup
}
}));
// Later...
sub.return(); // Run the finally blocks
which is much nicer.
There are other cases where we don't want the finally blocks to run, like the switchLatest2 example I posted earlier. In that case, we can use subscription.cancel()
. It seems to me that this proposal allows us to ergonomically cover the use cases we want for cancellation.
why not add next on the subscriber and complete the transformation from subscriber to observer
Because it really doesn't make much sense to allow the subscriber to push data values to the consumer. I think it does make sense to allow it to push abrupt completions, though.
If you have a generator that is using some scarce resource under the hood, there is no clean way (AFAIK) to simply finalize that generator and clean up the scarce resource without either calling return or throw on it's iterator.
That's true, and it's part of the reason I put this strawman out there: if you're using generator functions, then "terminate" doesn't help, because a generator function would never be able to distinguish between "terminate" and "return". Consequently, you'd still be forced to use some state tracking flag in the outer scope to tell the difference, which is something that we were trying to avoid in the first place.
if you remove the console.log(g.next());, at least in Babel, the catch block is never hit
Yeah, that's because before you execute g.next()
, the generator is paused at the beginning of the body and before the try statement.
@benjamingr
So to be perfectly clear - there are now three ways to cancel an observer from the producer and neither of them are sound (for example if an observable has multiple subscribers one can force cancel it for the others)?
No. The abrupt completion methods ("return" / "throw") would only affect the individual subscription in question, and would only be visible to that subscription's observer.
As for switchLatest2 would you mind explaining why this is better than calling return on the inner observable?
The idea is that you only want to invoke sink.return()
if the inner observable completed naturally, with an EOF from the data source. When you get a new inner observable, you don't want the finally block to run.
@benjamingr
The switchLatest2 example is meant to demonstrate that there are examples where you really don't want to run finally blocks on unsubscription, but you do want to run finally blocks if the stream successfully completes.
@zenparsing do you have any other examples where we'd want to cancel
without hitting the finally block? In particular user-land use-cases? The cancel
semantic solves the problem in switchLatest2
, but is an implementation detail. Frankly I much prefer the SerialDisposable
or SerialSubscription
method of doing that, as that particular construct is more broadly useful (IMO).
I'm also trying to think of other examples... Maybe you want to cancel a multiplexed data stream, but not close an underlying socket? I'm uncertain.
You don't want the catch block; just finally.
@domenic ... haha.. yeah thanks, the catch didn't make any sense there, you're right. I mean, it did something, but it was unnecessary.
[stuff] Which is really horrid. ... [stuff] ... which is much nicer.
@zenparsing I think I more clearly understand the reasoning for the desired behavior now because of your example of using a generator function to produce an observer. I feel silly I hadn't thought of that use case (triggering a finally block in an observer produced by a generator)
... but it does bring me back to the question I was hinting around at earlier: is the goal of cancel
to be the "dual" of simply walking away from an Iterator and not next
-ing values? Because in that case, no clean up would be called in the Iterable. In other words, since you can't "stop pulling" your going to signal that the producer simply "stop pushing"? It seems like a producer would then "know enough" to clean itself up afterwards though.
(As I understand them)
cancel()
: producer cleans up, does not signal consumer in any wayreturn(v)
: producer cleans up, signals consumer of it's successful completion (via return
handler)throw(err)
: produce cleans up, signals consumer of an errant completion (via throw
handler)vs
unsubscribe
: producer cleans up, producer signals consumer of dispose
(non-descript completion) if a dispose
handler i present, otherwise producer signals consumer of return
(successful completion) if the return
handler is present, otherwise does not signal the consumer in any way.... just reading those two examples, I much prefer the first option.
I'm about to run to a meeting, but I want to note that @trxcllnt and I were just talking about this, and we were worried about the implications of eventing return
back when it comes to a mergeAll()
or flatMap()
operation that has "inner observables and observers"... would those then be signalled of return
? How does that work? what are the implications? It requires more thought for sure.
... continuing where I left off... I've typed out the following example of a mergeAll
type operation:
function mergeAll(observable) {
return new Observable(sink => {
let inner = [];
let outer = observable.subscribe(prime(function* () {
try {
while(true) {
let value = yield;
inner.push(value.subscribe(prime(function* () {
try {
while(true) {
let innerValue = yield;
sink.next(innerValue);
}
} catch(err1) {
sink.throw(err1);
} finally {
sink.return();
}
})));
}
} catch(err2) {
sink.throw(err2);
} finally {
sink.return();
}
}));
});
return _ => {
inner.forEach(sub => sub.cancel());
outer.cancel();
}
}
Looking at that, it seems like if one called return()
on the subscription, it would only event out at the top level to the return
handler, but it would hit all finally
blocks because of the clean up calling cancel
... but what happens if you call throw()
? Does it hit the catch block of each? Does it only hit the catch block of whatever inner observer it happens to be in, plus the outer observer?
... actually... looking at my above example, it looks like the mergeAll
author would have to explicitly compose the return
or throw
through to the inner observables if that was a desired outcome. If I try to go the opposite way with an Iterable created by nesting generators, I'm left to compose the throw
and return
values through myself:
function* outerGen() {
function* innerGen(id) {
try {
while(true) {
yield id;
}
} catch(err) {
console.log('inner error ' + id, err);
} finally {
console.log('inner finally ' + id);
}
}
try {
var gen1 = innerGen(1);
var gen2 = innerGen(2);
while(true) {
yield gen1.next().value;
yield gen2.next().value;
}
} catch (err) {
// compose the error through to the inner generators
gen1.throw(err);
gen2.throw(err);
console.log('outer error', err);
} finally {
// compose the finalization through
gen1.return();
gen2.return();
console.log('outer finally');
}
}
var g = outerGen();
console.log('->', g.next());
console.log('->', g.next());
console.log('!!!', g.throw('bad'));
If feels like I'm doing something wrong though... so I probably am. Just trying to understand the problem space.
@zenparsing @blesh unsubscribe
travels "up" the Subscription chain (starting with the consumer, ending with the producer). How do return
and throw
operate with respect to the Subscription chain?
@trxcllnt ... if you look at my example a few comments above, it seems to not do anything unless you explicitly build an operator to do so.
All: I hope you don't mind myself or @trxcllnt bringing operator and subscription chaining into this discussion, because while it's technically orthogonal to the proposal/spec, it's definitely the most inevitable use case for Observables.
@blesh @trxcllnt No - this is good stuff : )
So first, I think mergeAll needs to do nothing in the finally clause of the inner observable, right? Because mergeAll shouldn't stop when one of the inner observables complete.
Looking at that, it seems like if one called return() on the subscription, it would only event out at the top level to the return handler, but it would hit all finally blocks because of the clean up calling cancel
Clarifying question: by "subscription" do you mean the subscription to mergeAll, the outer subscription, or one of the inner subscriptions?
@trxcllnt For this proposal calling return
or throw
on the subscription is exactly like the producer calling return
or throw
on the "safe" observer.
So the return
/throw
flows "down" the subscription chain, and then the cleanup function runs going back "up" the subscription chain.
@zenparsing - Yes I meant the subscription to mergeAll
@blesh Let's say we have this:
let sub = mergeAll(sources).subscribe(prime(function*() {
try {
// ...
} finally {
// ...
}
}));
// Much later:
sub.return();
The return
call will execute the finally block inside of the generator function, and then it will execute mergeAll's cleanup function. That function will then cancel
the outer and inner subscriptions, which is exactly what we want it to do: we don't want those finally blocks to run.
It seems to me that whoever holds the subscription knows how the observer should be terminated, and by giving the subscription this functionality we can sidestep the question of how to automatically gracefully shut down the observer.
@zenparsing I'm not sure we need to introduce throw
or return
to ensure the finally block is executed. Simply calling unsubscribe
should trigger the generator's (observer's) finally block, whether it was invoked by the source Observable calling throw
/return
, or by the consumer disposing, just like Rx's finally operator does today.
@trxcllnt ... I believe that's what @jhusain was saying was missing from the current Generator spec. A dispose
method (in this thread, I think we're calling it cancel
) that would do just that, only execute the finally block.
@blesh @trxcllnt Right. I imagine this is like a balloon that we're squeezing in one place; no matter where we squeeze, the balloon will expand out somewhere else.
The force that's adding complexity is the desire to cleanly shut down observers upon termination (in most scenarios).
We can leave the subscription alone (with only an "unsubscribe" method), but that means that we need to add a "terminate" method to generators and observers.
We can leave generators alone, but that means that we need to add additional methods to the subscription ("return" and "throw").
A problem I have with the first option (adding a terminate or dispose method to generators) is that generator functions are still not able to discriminate between termination and completion:
observable.subscribe(prime(function*() {
try {
// ...
} finally {
// Did I get here from a "return", or "terminate"? I have no way of knowing!
}
}));
Also, I think it's going to be difficult to justify adding a "terminate" method which is semantically identical to "GeneratorPrototype.return".
@blesh @zenparsing Right, since Generators can't distinguish between return
and unsubscribe
, what would return
and throw
on Subscriptions accomplish that unsubscribe
doesn't?
When someone calls unsubscribe
, the Subscription still has to resort to calling return
on the Generator it wraps. Since calling return
on a Generator is sufficient action to trigger the finally
block, our cleanup use-case is satisfied.
The only actor that should be able to throw
on a Generator is the source Observable. How would a Generator distinguish between errors from its source Observable vs whoever is currently managing the Subscription? Subscriptions are resources, they shouldn't have insight into the behavior they're fronting.
since Generators can't distinguish between return and unsubscribe, what would return and throw on Subscriptions accomplish that unsubscribe doesn't?
The idea is that the holder of the subscription usually knows how best to terminate the connection.
Let's say that unsubscribe
always invokes observer.return()
, or even observer.dispose
(it really doesn't matter). The switchLatest2 combinator above becomes:
function switchLatest2(observable) {
return new Observable(sink => {
let inner = null,
canceled = false;
let outer = observable.subscribe(prime(function*() {
try {
while (true) {
let value = yield;
if (inner) {
inner.unsubscribe();
canceled = true;
}
canceled = false;
inner = value.subscribe(prime(function*() {
try {
while (true)
sink.next(yield);
} catch (x) {
sink.throw(x);
} finally {
// Notice how I need to keep track of whether we unsubscribed
// or completed with a state variable in the outer scope.
// This is super ugly!
if (!canceled)
sink.return();
}
}));
}
}
catch (x) { sink.throw(x) }
finally { sink.return() }
}));
return _=> {
if (inner)
inner.unsubscribe();
outer.unsubscribe();
};
});
}
Notice how we still need to maintain a state variable in the outer scope to tell the difference between cancelation and completion. But it's clear in this case that the holder of the subscription knows how best to terminate the connection. For this combinator it should just drop the connection rather than forcing the finally block to run.
The only actor that should be able to throw on a Generator is the source Observable. How would a Generator distinguish between errors from its source Observable vs whoever is currently managing the Subscription?
I agree there is a principle of least authority smell here. I think this is a good counter argument so I'm going to take some time to think of a response. : )
I think the guidance for use here is in most cases the consumer should only ever call dispose()
and not return()
or throw()
, but there may be cases where the consumer will want to do those things. To me, this seems reasonable. And it has the added benefits of: 1) Not introducing another type into JavaScript, because everything is Generators in and Generators out; and 2) Fixing Generator to have a dispose() method that it was missing.
I just want to point out that if we can consolidate this to expose next
somehow and allow for await*
somehow, we can have one type (no more forEach
and subscribe
) that can be awaited which would be cool.
@zenparsing why is sink.return()
in the inner finally block? Shouldn't it be after the while-loop?
/* ...snip */
inner = value.subscribe(prime(function*() {
let returnError = undefined;
let returnErrorHappened = false;
try {
while (true) {
sink.next(yield);
}
try {
sink.return();
} catch(e) {
returnError = e;
returnErrorHappened = true;
}
} catch (x) {
sink.throw(x);
} finally {
if(returnErrorHappened) {
throw returnError;
}
}
}));
@blesh If native Generators are to be used as Observers, they'll need a dispose
method anyway. As I illustrated above, I don't see a compelling argument for sending a message along with termination. The Generator won't be able to distinguish between messages from the Observable and messages from the Subscription.
@zenparsing Apologies for the flood of comments, I'm just now able to sit down and really think through this stuff.
I imagine this is like a balloon that we're squeezing in one place; no matter where we squeeze, the balloon will expand out somewhere else.
This problem was solved by the relationships between Observables, Observers, and Disposables in Rx. The creation-subscription-disposal triangle cleanly models all aspects of push-based iteration, I don't understand why we'd deviate here.
try {
for(let i = 0, n = 10; i < n; ++i) {
// onNext
yield i;
if(i === n - 1) {
// onCompleted
return;
}
}
} catch(e) {
// onError
throw e;
} finally {
// dispose
}
The force that's adding complexity is the desire to cleanly shut down observers upon termination (in most scenarios).
We can leave the subscription alone (with only an "unsubscribe" method), but that means that we need to add a "terminate" method to generators and observers.
Since a Generator's finally
block can't distinguish between their (inner) return
or an external party invoking return
early, doesn't return
fully model the behavior of dispose
for Generators? Perhaps adding a dispose
function that's an alias for calling generator.return(undefined)
would help, but not mandatory.
We can leave generators alone, but that means that we need to add additional methods to the subscription ("return" and "throw").
I think I covered this in my last few comments.
I'm a little concerned by how much I seem to disagree with this idea. Am I totally missing the point, or are we all on the same page?
I'm going to close this strawman, since I no longer want to pursue it.
I think the principle of least authority counter-argument is compelling. Even more, I see this as yet another attempt to patch a problem with the observer/generator design which adds more unnecessary complexity to the API.
@trxcllnt
I'm a little concerned by how much I seem to disagree with this idea. Am I totally missing the point, or are we all on the same page?
I don't think we are on the same page. I think there's a fundamental confusion embedded within the design which I'll try to explain in another thread.
@zenparsing
Yes, I agree, while the design with generators is very elegant and clever it is also very confusing and the complexity needs to either be hidden better or avoided altogether somehow.
This is a strawman alternative proposal to adding a "terminate" method to observers. It may have fatal problems.
The primary issue with adding "terminate" to observers is the fact that we need to extend the ES generator interface with additional complexity which is only really beneficial for Observable.
Proposal
The
subscribe
method of observables returns a Subscription object. Subscription objects have the following interface:A Subscription object may be used to early-terminate the communication between an observable and an observer. Each method invokes the cleanup action specified by the observable. In addition:
subscription.return
calls thereturn
method of the observer with the supplied argument. It terminates the data stream with a completion value.subscription.throw
calls thethrow
method of the observer with the supplied argument. It terminates the data stream with an error.subscription.cancel
does not invoke any methods on the observer.The advantage of this interface is that it lets the holder of the subscription object determine how the observer should be shut down, if at all. This allows a wider range of use cases to be expressed without adding additional complexity to the generator type or adding state variables to keep track of which kind of completion was intended.