tc39 / proposal-observable

Observables for ECMAScript
https://tc39.github.io/proposal-observable/
3.07k stars 90 forks source link

Exceptions and closures cause cleanup, in addition to cancelation? #109

Closed domenic closed 7 years ago

domenic commented 8 years ago

Apologies if this has been discussed before; I'll gladly take pointers to existing issues.

When talking with @jhusain about the cancelation design, I was told that not only does explicit cancelation cause the cleanup function to run, but so does throwing an error or reaching the end of the observable. So, given the following:

document.body.on("click").subscribe({
  next(e) {
    if (Math.random() > 0.5) {
      throw new Error();
    }
    console.log("clicked", e);
  }
});

If I understand correctly, we have a 50% chance each click of stopping the click-stream forever (since the cleanup operation here is to do removeEventListener).

This seems like a big mismatch with the event semantics of the DOM, where errors in event listeners get reported, but do not cause all future events to stop firing.

It's less clear to me whether the cleanup function should be called after the event stream ends, because in the DOM event streams don't generally end, but even in cases where they kind of do (e.g. progress events), there's no need to unsubscribe---garbage collection removes the listeners for us anyway.

jhusain commented 8 years ago

Note that the cleanup logic applies to the current subscription. It does not need to terminate the underlying event stream. This is not dissimilar from arrays. I can request an iterator from an Array which terminates. This doesn't mean that the array is destroyed. In the meantime people can modify it, and the next time I retrieve an iterator for the array it may have different data. Note that in this case the underlying array is a hot data source, because the data that comes out of the iterator depends on when it is requested.

Note that only the current subscription to the observable terminates in the event of an error. There is nothing stopping a consumer from immediately resubscribing to the observable should they want to continue listening for future events. There is no risk that they will miss events either, because it is possible to subscribe synchronously.

domenic commented 8 years ago

Right, I understand how it's possible to work around this. But why is it a good idea? It's certainly an impedent to trying to create an observable system for DOM events, due to the broken programmer expectations.

zenparsing commented 8 years ago

I'm not aware of any previous discussions on this topic (and I'm surprised we haven't talked about it here).

In fact, the EventTarget implementation in terms of Observable I wrote messes this up. That doc can be repaired by just wrapping the handler.call call in a try-catch, I think.

I agree there is a clear mismatch between the DOM's handling of errors and errors in observables. I think the current behavior for Observable falls out of a couple of facts:

Imagine the following chain:

A => x => y => B

Where A is the producer, B is the sink, and both x and y are combinators (e.g. map). Each => represents an active subscription. Say B throws an error. The error travels backward down the chain.

Currently, we close each subscription (=>) as the error passes back through it.

For the sake of argument, let's say that we don't close the subscriptions as the error flows back through them. When the error gets to A, what if we then want to tear down the chain? How can we send a cancellation signal all the way through the final =>? The only channels we have are error, complete, and next, and none of those fit in this case. We'd need a new method representing a cancel completion, or something. 😄

@jhusain or @blesh: Do you know if other Observable-ish libraries have a different take on this issue?

staltz commented 8 years ago

xstream behaves differently. All streams in that library are hot, suitable for event sources like DOM clicks, and subscriber errors are not handled in any way.

This code does not perform cleanup when the subscriber/listener throws:

const stream = fromEvent(document, 'click');

stream.addListener({
  next(ev) {
    if (Math.random() < 0.5) {
      throw new Error(); 
    }
    console.log('click');
  },
  error() {},
  complete() {}
});

http://www.webpackbin.com/4yhdEb3cW

On the other hand, error thrown in one listener will affect sibling listeners subscribed to the broadcast from the source stream

const stream = fromEvent(document, 'click');

stream.addListener({
  next(ev) {
    if (Math.random() < 0.5) {
      throw new Error(); 
    }
    console.log('click');
  },
  error() {},
  complete() {}
});

stream.addListener({
  next(ev) {
    console.log('second click');
    // will not run if an error was thrown in the first listener
  },
  error() {},
  complete() {}
});

Also, @rpominov has built a Observable-ish library and has some ideas about expected/unexpected error handling in Promises/Observables, like https://github.com/tc39/proposal-observable/issues/104 and https://github.com/rpominov/fun-task/blob/master/docs/exceptions.md.

rpominov commented 8 years ago

Yeah, Kefir does not catch errors at all, and in this particular case behaves similarly to xstream.

zenparsing commented 8 years ago

I'm having trouble coming up with a killer argument for the current auto-closing behavior.

Supposing that we didn't force cleanup on exception, the user could opt-in to the current auto-closing behavior with a pretty simple combinator.

function closeOnError() {
    return new Observable(observer => this.subscribe({
        start(subscription) {
            this.subscription = subscription;
        },
        next(x) {
            try {
                observer.next(x);
            } catch (err) {
                try {
                    this.subscription.unsubscribe();
                } finally {
                    throw err;
                }
            }
        },
        error(x) { return observer.error(x) },
        complete(x) { return observer.complete(x) },
    }));
}

(Untested)

Usage:

// Closes the stream on the first thrown error
document.on('click')::closeOnError().subscribe(e => {
    if (Math.random() > 0.5) {
        throw new Error();
    }
    console.log("clicked", e);
});

@jhusain what do you think?

jhusain commented 8 years ago

l too am having difficulty making the argument for auto closing. Currently looking around for some strong use cases. Leaning towards changing this behavior to better match with event target semantics.

jordalgo commented 8 years ago

+1 that this type should have no opinion on errors in either Observers or Observables. Unexpected errors should be noisy and harmful (to the program) and therefore only protected against with explicit use of try/catch.

benlesh commented 8 years ago

So when a producer signals completion, meaning the observer can never, ever send another value, we're going to leave resources hanging out until the consumer explicitly tears them down?

Doesn't that mean consumers would always need to handle completions and errors manually tear down resources? Otherwise they risk leaks?

I don't really see what the point of removing auto teardown is

rpominov commented 8 years ago

Seems like we're discussing two topics here that should be separate:

1) Should observable tear down after first error? 2) Should exceptions thrown from whatever became observable errors?

I am not certain about answer to the first question. Automatic tear down seems like a reasonable option, but there also libraries like Bacon and Kefir that allow multiple errors as well as multiple values. They basically try to treat values and errors symmetrically.

As for exceptions I've expressed my opinion in #104 , it boils down to whether we want to support Either/Railways pattern or not: https://github.com/tc39/proposal-observable/issues/104#issuecomment-237843978 . In other words what is the semantics of errors: do they model built-in Either or async try-catch?

jordalgo commented 8 years ago

1) Should observable tear down after first error? 2) Should exceptions thrown from whatever became observable errors?

I feel like we're talking about 1. And I'd much rather there is a specific method that closes on error (like what @zenparsing demonstrated) only because making assumptions about how devs want to deal with errors is something this type should not be doing even if it means there might be memory leaks. Although, probably, bad practice some devs might want the subscription to stay open even after a error is thrown.

dinoboff commented 8 years ago

Does it mean Observable would switch from having two types of subscription ending (one expected, one unexpected) to having two types of emitted values (expected values and errors)?

If Observable are promise too, what would catch be used for?

benlesh commented 8 years ago

1) Should observable tear down after first error?

YES. Definitely. Otherwise the ergonomics of the type will be a hot mess, and there will be a huge memory leak footgun laying around.

const source = new Observable(observer => {
  let i = 0;
  const id = setInterval(() => observer.next(i++), 1000);
  return () => clearTimeout(id);
});

source.map(n => {
  throw new Error('I hope you like intervals running forever, noob Observable user!!! LAWL');
});

const subscription = source.subscribe({
  next(x) { console.log('Optimism!!!', x); }
});

Above it'll error on the first tick of the interval... but if you don't tear down, the interval keeps going.

mAAdhaTTah commented 8 years ago

@blesh In your example, wouldn't the subscriber just continue to receive errors? It wouldn't be a memory leak if the stream was still active. If the map threw an error e.g. on odd numbers, the subscriber would (assuming errors thrown become Observable errors) alternate between getting an error and getting a value. If thrown errors don't become Observable errors, then the Observables should cleanup after themselves, if they're going to stop emitting values to their subscribers.

If I understand correctly, the idea is expected errors and values can be propagated down those two channels without the Observable ending, but unexpected errors would crash everything. I don't think he means "tear down" so much as "end".

jordalgo commented 8 years ago

1) Should observable tear down after first error? 2) Should exceptions thrown from whatever became observable errors?

Perhaps it's difficult to separate these in this conversation. @blesh if the answer to both of the above is "No" then I don't consider your example a memory leak footgun as the leak isn't hidden. The program will keep throwing uncaught errors which is pretty hard not to notice if you're debugging. However, if the answer to 2 is Yes, then it is possible the memory leak could be hidden as the user could potentially have a no-op for their error consumer.

benlesh commented 8 years ago

In your example, wouldn't the subscriber just continue to receive errors?

No. The observer is closed after the first error passes through it. It cannot send more than one error.

Perhaps it's difficult to separate these in this conversation. @blesh if the answer to both of the above is "No" then I don't consider your example a memory leak footgun as the leak isn't hidden.

I didn't understand the second question, honestly. But the answer to 1 is an emphatic "yes".

rpominov commented 8 years ago

I didn't understand the second question, honestly. But the answer to 1 is an emphatic "yes".

Let me try to explain. The second question is about implementation of map and other combinators and subscribe method. Currently they usually have something like this inside:

map(fn) {
  // ...
  try {
    newX = fn(x)
  } catch (e) {
    // put error into observable
  }
  // ...
}

This behavior corresponds to answer "yes" to the second question. If we answer "no", we would change code above to:

map(fn) {
  // ...
  newX = fn(x)
  // ...
}

In this case the only way to put a error into an observable would be by calling observer.error(), or by using operators like flatMap which would also call observer.error() under the hood if they receive a error from spawned observable.

But this is really a separate subject, I think, which we should discuss in #104


YES. Definitely.

I agree. If we allow only one error per observable, in case of source.map(n => {throw ...}) or source.flatMap(n => obsWithError) the teardown function of source should be called.

If we allow more than one error per observable though, than teardown should not be called in this case source.map(n => {throw ...}), but should be in this one source.map(n => {throw ...})::closeOnError().

Update: runnable example of how this works in Kefir

benjamingr commented 8 years ago

Ok, so I've stared at this for a while now and I also find it really surprising that this wasn't brought up before. This is a really good point.

I dug our Rx code for the last hour and it all behaves in the same semantics @blesh described. It never really gave us issues as we monitor and treat any thrown exception in an event handler as an error anyway.

I wouldn't like to give up cleanup logic on exceptions. I think the way we approach this problem doesn't really depend on Rx per-se, async iterators have the exact same issue and so do regular iterators. Cleanup on errors is an extremely useful guarantee and I've used it (admittedly less so with DOM events) very liberally before (kind of like finally in async functions - or resource blocks in some other languages).

I think this is something the on implementation has to fix. It's not a lot of work to fix and I'm surprised my own code never bothered fixing it although I clearly remember having exceptions in event handlers and fixing bugs around them.

domenic commented 8 years ago

Why is this something the on implementation would fix, instead of something that would be fixed in observables? That seems to imply that in observables in general, an error in the next handler should cause cleanup behavior, but for DOM observables, it should not. That just seems confusing at best, and a sign of a mismatch between the web platform and observables at worst.

jhusain commented 7 years ago

It seems Observable is used in two contexts:

  1. As a way of consuming event streams in async functions
  2. As a push data stream used with callbacks

The forEach and subscribe methods respectively cover both these cases. I think we can agree that developers will expect the following code to log "error" to the console:

async function test() {
  try {
    await Observable.from([1,2,3]).forEach(x => {
      throw "error";
    });
  }
  catch(e) {
    console.log(e);  
  }
}

This matches the behavior of Array's synchronous forEach, and I believe any other behavior would be very unexpected - not to mention a refactoring hazard.

However if a developer is using subscribe and callbacks they may well expect EventTarget-like semantics. Under the circumstances I think subscribe should not autoclose. Any developer that wants to close can simply try/catch their observer methods:

let {cancel, token} = CancelToken.source();
observable.subscribe({
  next(v) {
    try {
      doSomethingThatMightFail(v);
    }
    catch (e) {
      cancel(new Cancel("Something went wrong"));
    }
}, token);

If subscribe doesn't tear down when an observer throws, Observable behaves just like EventTarget. This is currently how Kevin has implemented things, and I'm comfortable with the rationale. Under this proposal, the on method would do no error suppression. Thoughts @domenic ?

zenparsing commented 7 years ago

It might be helpful to look at how we would implement auto-close on next-throw if it wasn't provided by SubscriptionObserver:

new Observable(sink => {
  // Use this instead of sink.next
  function next(value) {
    try {
      sink.next(value);
    } catch (error) {
      try { sink.complete(); } // Or sink.error?
      finally { throw error; }
    }
  }
  return () => {
    // cleanup
  };
})

The wrapping of the SubscriptionObserver can be abstracted quite nicely:

new Observable(sink => {
  sink = AutoClosing(sink);
  return () => {
    // cleanup
  };
})

How would we implement no-close-on-throw, given the current spec? It seems to me like it would be quite difficult because the subscription is completely shut down on throw. I think you'd have to wrap the entire observable with a different subscribe implementation which re-subscribes the observer if it throws on next. I'm not even sure.

It looks to me like it's fairly easy to go the one way, and pretty hard to go the other. With that in mind, I'd argue for changing the behavior per #123 unless it can be shown that not performing cleanup on next-throw is a footgun.

jhusain commented 7 years ago

Agreed.

jhusain commented 7 years ago

I'd like to propose the following mental model: calling Observable.prototype.subscribe is like calling Symbol.iterator on an iterable, and calling Observable.prototype.forEach is like using a for...of statement on an Iterable.

If a consumer throws while consuming a value, note that the iterator does not self close.

let iterator = [1,2,3][Symbol.iterator]();
let { value } = iterator.next();
value.someFunctionThatDoesntExist(); // throws and leaves iterator open

This corresponds to the proposed subscribe behavior, where throwing does not close the subscription:

observable.subscribe({
  next(value) {
    value.someFunctionThatDoesntExist(); // throws and leaves subscription open    
  }
});

Let's contrast this with for...of...

for(value of [1,2,3]) {
  value.someFunctionThatDoesntExist(); // throws and closes iterator
}

...and forEach, each of which close the subscription if code handling the value throws:

observable.forEach(value => {
  value.someFunctionThatDoesntExist(); // throws and closes subscription
});

Seems like we have a straightforward correspondence here.

benlesh commented 7 years ago

@jhusain So we've spent a lot of time discussing what happens in an error in an operation like map or subscribe... but what happens when there's an error signaled by the producer? Does it behave differently? Does it still always clean up?

const source$ = new Observable((observer, token) => {
  token.promise.then(() => {
    console.log('will this be called?');
  });
  observer.throw(new Error('haha'));
});

source$.subscribe(); // here?
source$.forEach(() => { }); // what about here?

Or in the older model...

const source$ = new Observable((observer, token) => {
  setTimeout(() => observer.error(new Error('haha')));
  return () => {
    console.log('will this be called?');
  };
});

source$.subscribe(); // here?
source$.forEach(() => { }); // what about here?
jhusain commented 7 years ago

Yes. If an Observable implementation sends a error or complete notification then cleanup behavior is executed.

benlesh commented 7 years ago

Yes. If an Observable implementation sends a error or complete notification then cleanup behavior is executed.

:+1: Solid. Thanks

jordalgo commented 7 years ago

@jhusain - I didn't know that iterators had this behavior but if it's well known enough not to be a foot-gun then 👍