tc39 / proposal-observable

Observables for ECMAScript
https://tc39.github.io/proposal-observable/
3.07k stars 90 forks source link

Should producer error close/cancel subscription ? #127

Open jordalgo opened 7 years ago

jordalgo commented 7 years ago

This was partially discussed here and here but I wanted to have a more focused conversation around explicit error calls as opposed to exceptions. My apologies and please be gentle if I'm missing something blantantly obvious. In the current implementation, calling observer.error cancels the subscription and calls the cleanup method but I wanted to discuss the pros (and cons) of leaving it open.

Pros for Not Closing

  1. Allow the producer to decide what errors are considered bad enough to close/cancel the subscription (by firing error and then complete). In the case of dealing with asynchronous sequences of data (like reading a file), it's easy to see how it's neccesary to close the subscription if an error occurs but it seems easy enough to have the publisher do this manually, subclass the type with this functionality, or create an operator like closeOnError.

  2. At the point that a producer is calling an observer's error, I feel like the actual "error", whatever it is, has been "caught" or at least anticipated -- that is it's not an unexpected/programmer error, so why is it assumed that this error will cause issues (memory leaks or unexpected behaviors) in the Obversable if it doesn't end? This is in addition to the fact that the current implementation doesn't automatically try/catch around next calls.

  3. EventTargets don't have this behavior of automatically removing listeners on error. Take the video node, for example; if it emits an error, the listener isn't automatically removed. (also mentioned here)

  4. There is more freedom and power for both the producer and the observer. If errors didn't auto-cancel, observers would have the option of unsubscribing on error and also they wouldn't have to re-subscribe if they wanted to react to multiple producer errors. Producers wouldn't have to worry, if they wanted to make errors (like server errors) available to the observer without wrapping said error in an object passed to next e.g observer.next({ error: '404', data: null }).

Counters:

  1. What to do if the observer didn't implement an error handler? If these are "operational" errors then should they be thrown if the there is no error handler? Maybe?

  2. This contradicts general promise and observable composition (which I'm assuming is probably a non-starter for most folks).

Hopefully long time RxJs contributors and Observable advocates have more counter arguments but I just wanted to throw this out there. Again, please be gentle.

benjamingr commented 7 years ago

This contradicts general promise and observable composition (which I'm assuming is probably a non-starter for most folks).

Why?

trxcllnt commented 7 years ago

@benjamingr because Rx guarantees deterministic memory management. Without teardown onError, unresolvable memory leaks can occur.

trxcllnt commented 7 years ago

The correct behavior is for the Observer to teardown when it receives an onError message. The consumer can choose to retry() onError, which just re-subscribes to the source. This is safe/correct for stateless computations, while still allowing state to be preserved (via multicast/refCount) for stateful computations.

jordalgo commented 7 years ago

@benjamingr I was more thinking that if an Observable could emit multiple errors how would that work for the promise returned by forEach (as you can only reject once) ? Or has that feature not been officially decided on yet?

jordalgo commented 7 years ago

@trxcllnt - Do you mind expanding a bit on that and/or providing some examples? I read something similar in the RxJs docs but had trouble imagining when that would occur or how its more safe or stateless.

trxcllnt commented 7 years ago

@jordalgo if an Observable could emit multiple errors

Per the Observable contract, an Observer's onError can only be called once, after which no more messages can be sent to that Observer.

Do you mind expanding a bit on that and/or providing some examples?

Sure, no problem. This example might seem trivial, but it's succinct and easy to follow. Let's say I'm consuming the values from computing the fibonacci sequence over time and something goes wrong:

function fibs(period = 0) {
  return Observable
    .interval(period)
    .scan([i, n] => [n, i + n], [1, 0])
    .pluck(0)
}

fibs(100 /*ms*/).do((val) => {
    if (val === 13) {
      throw new Error('bad luck!');
    }
  })
  .subscribe({
    next: console.log.bind(console, 'value'),
    error: console.log.bind(console, 'error'),
    complete: console.log.bind(console, 'done')
  });
/*
value 0
value 1
value 1
value 2
value 3
value 5
value 8
error Error: bad luck!
*/

In this example, teardown on error ensures the interval Observable is cleaned up. Without it, we can't call clearInterval, so we'd have a ghost interval spooking up our app.

We could recover from the error with retry(), which would restart the fib sequence on error:

fibs(100 /*ms*/).do((val) => {
  if (val === 13) {
    throw new Error('bad luck!');
  }
})
.retry(1)
.subscribe({
  next: console.log.bind(console, 'value'),
  error: console.log.bind(console, 'error'),
  complete: console.log.bind(console, 'done')
});
/*
value 0
value 1
value 1
value 2
value 3
value 5
value 8
value 0
value 1
value 1
value 2
value 3
value 5
value 8
error Error: bad luck!
*/

This is correct in functional programming. Declaring an observable is like declaring a function. So subscribing to an Observable is like calling a function. And so re-subscribing to an Observable is like re-calling a function; it just re-executes its logic again, isolated from any other callers.

But it might be critical to preserve the current state of the fibonacci calculation. If that's the case, we can multicast the values through a Subject (which are like reactive variables), and preserve the original subscription:

fibs(100 /*ms*/)
  .multicast(
    () => new ReplaySubject(1),
    (xs) => xs
      .do((val) => {
        if (val === 13) {
          throw new Error('bad luck!');
        }
      })
      .catch(() => xs)
  )
  .take(10)
  .subscribe({
    next: console.log.bind(console, 'value'),
    error: console.log.bind(console, 'error'),
    complete: console.log.bind(console, 'done')
  });
/*
value 0
value 1
value 1
value 2
value 3
value 5
value 8
value 13
value 21
value 34
done undefined
*/
jordalgo commented 7 years ago

@trxcllnt Thanks much! Just a few comments :)

Per the Observable contract, an Observer's onError can only be called once, after which no more messages can be sent to that Observer.

I do admit I feel a little silly trying to change a type that has been around for so long but if you'll indulge me...

In your example:

function fibs(period = 0) {
  return Observable
    .interval(period)
    .scan([i, n] => [n, i + n], [1, 0])
    .pluck(0)
}

fibs(100 /*ms*/).do((val) => {
    if (val === 13) {
      throw new Error('bad luck!');
    }
  })
  .subscribe({
    next: console.log.bind(console, 'value'),
    error: console.log.bind(console, 'error'),
    complete: console.log.bind(console, 'done')
  });

This is correct in functional programming. Declaring an observable is like declaring a function. So subscribing to an Observable is like calling a function.

No disagreeing with you there but I'm still a bit lost about why treating the error path similarly to the next path violates functional programming. I'm thinking of the Either or Task/Future types which seem to treat errors as just a value rather than a state. Again, please forgive me if I'm missing something.

trxcllnt commented 7 years ago

No problem at all. The Observable is the push dual of Enumerable (aka Haskell's Data.List monad). The duality of the type is only maintained if the push semantics of Observability mirror the pull semantics of Enumerability. There's 4 key sections to enumerating a list of values:

try {
  while (iterator.hasNext()) { // <-- can throw or return false
    // 1. handle next value
    const x = iterator.getCurrent();
    //4. break early
    if (x === 5) { break; }
  }
  // 3. may have iterated some values, then finished successfully
} catch(e) {
  // 2. may have iterated some values, then got an error
}

And their Observable (push) equivalents:

const subscription = observable.subscribe({
  next(x) { /* 1. handle next value */ },
  error(e) { /* 2. may have observed some values, then got an error */ },
  complete() { /* 3. may have observed some values, then finished successfully */ }
});
//4. break early
subscription.unsubscribe();

Enumerables and Observables as they're defined here are referentially transparent; invoking enumerable.getEnumerator() or observable.subscribe() executes some discrete computation to produce results, just like a function. In fact, a function can be thought of an Enumerable that produces a single value. When you call a function a second time, you expect it to do its work again. And each time you call a function, it can only throw a single value. The same must be true about Enumerables, and thus, Observables.

appsforartists commented 7 years ago

I really appreciate both of you taking the time to explain your points of view. Thank you.

jordalgo commented 7 years ago

@trxcllnt - Thanks so much, that is great info. I had seen an Erik Meijer talk on duality but hadn't read that paper.

The code example you provided really illustrates your point well. I do have a few more questions, if you'll allow me to push on.

1a. So an observer's error handler is really a proxy for not being able to get errors that throw in next( or perhaps while "iterating") but as iterators (a lot of time) tend to keep state (the index of the current value for example), if an error gets thrown while iterating does that always result in the resetting of this state ? Doesn't whatever code sits in the catch block have the opportunity to continue iteration after an error at that index?

1b. Also, if error handler is a proxy for try/catch then shouldn't an Observable's next be wrapped in a try/catch and routed to error? -- I feel like you, @trxcllnt , have argued for this before.

benlesh commented 7 years ago

@jordalgo I'm going to over-simplify this, probably, but basically error and complete channels on observable are explicitly to push notification that the producer wishes to tear down its resources due to an error, or due to it just being done successfully (respectively).

Comparisons to EventTarget are a little off here because EventTargets only push values out of themselves. They don't push errors (or completions for that matter). Generally the source of errors involving event target happen in the handler registered to the event target. There is a discussion regarding handling errors thrown in the observable equivalent of those handlers, Observer, here: #119

Observables can be used to wrap or model EventTargets because they're a push-based async primitive. Observables can also be used to wrap or model Promises. They're a very low-level async primitive.

Observable is really just a function that accepts your Observer, wraps it in some guarantees, then passes it to the body of the function. It also adds some sugar around teardown and tying the observer to it. It doesn't multicast on it's own. It doesn't have the concept of removing listeners on it's own. It's a very primitive type. So trying to optimize Observable itself for a particular behavior around removing listeners when the observable is wrapping some externally created producer that multicasts is going to be awkward.

trxcllnt commented 7 years ago

@jordalgo onError isn't only meant to communicate errors thrown from onNext, even though it's also used for that. It's just meant to signal "an error happened, here it is, and by the way you won't be hearing anything else from me." Just as there's a perfectly reasonable use-case for signaling successful termination (e.g. interval(100).take(10)), there's also perfectly valid reasons for signaling unsuccessful termination (e.g. getJSON('/foo').timeout(3000)).

jordalgo commented 7 years ago

Thanks much, gents -- really appreciate the info here. I'm going to close this issue as it seems like the Observable type (and the way it treats the error path) has been around too long to change and/or discuss here. I'm still not totally convinced that the "the push dual of Enumerable" implies that pushing something on the error path should cancel and cleanup the subscription but perhaps we all with Erik Meijer can discuss this one day over coffee (I'll buy, of course 😉 ) .

It seems to me that maybe a strict Observable is not what I want to use to poll for data or cases where I want to be able to pass operational errors downstream. I could do this with Observables by passing an object like { operationalError: null, data: [] } to next but then if I have any operators like map or filter it's a big pain to tease the data I actually want to map or filter out of it. I think a big take-away for me is to learn that the Observable error path, similar to Promise reject/catch, is really for exceptions only.

trxcllnt commented 7 years ago

@jordalgo as @blesh mentioned, since Observables are an async primitive, they're meant to be composed into more complex types with combinators, multicasting, etc. You can build a lot of cool stuff as long as the underlying primitives follow all the rules :).

From what you've said, it sounds like multicasting a single source into two streams might be the way to go. One perhaps for composing computations on the values, and the other that emits operational errors. Then you can combine them in whichever way solves your problem (e.g. values.takeUntil(errors.skip(10))) :->

jordalgo commented 7 years ago

@trxcllnt - That's a good suggestion; I'll tinker with that some 👍

jordalgo commented 7 years ago

I don't think I'm going to re-open this issue but I was wondering if some generous folks wouldn't mind providing some additional input and knowledge.

I fully understand that the Observable type has been around for a while (and there is contract) and has always had the behavior of either pushing an error or complete to signify the end of the stream. My question is: could this proposal be focused on delivering the highest value push primitive rather than on the strictness of it being an Observable (Enumerable dual) ? I say that not to be inflammatory but to advocate for what I think is a nice feature of not 'auto-closing' a stream when the producer pushes an error.

Now that the Observable type is not going to be routing caught errors to observer error (119) I'm trying to understand the actual usefulness of not letting the producer make it's own decision about when to close the subscription since any error that it pipes to an observer's error handler is effectively handled (by the producer). This is not a new concept in the JS stream community (Kefir, Bacon) so even though this may be moot (as this is a proposal for an Observable) I was hoping to get some good pros and cons of this particular piece of functionality.

appsforartists commented 7 years ago

FWIW, our team also found the termination of streams on complete/error to be harder to reason about (and complexity we didn't need), so we built an implementation that only supports next:

https://github.com/material-motion/indefinite-observable-js/

benlesh commented 7 years ago

@appsforartists this is interesting, but I see a few possible weak points:

  1. Without an error channel, you're missing the important ability to compose async errors that both Promise and Observable give you.
  2. Without a complete channel, you can't ever know if your set of values is done, therefor you can never concat, zip or even merge effectively.

I can understand how an error causing tear down of the producer can be confusing. But surely it can be understood that the producer will be torn down when the stream is complete?

I think you could get the same ergonomics without the abstraction overhead by using simple callbacks and explicit teardowns if you're more comfortable with that.

appsforartists commented 7 years ago

@blesh, I don't know if this is the right venue for this conversation, but the values we're modeling never complete; only come to rest. We have a state channel that represents whether the stream is active or atRest:

https://github.com/material-motion/material-motion-js/blob/develop/packages/streams/src/MotionObservable.ts

IndefiniteObservable is just a foundation to build more interesting observables on top of, whether that's by adding operators, channels, or in our case: both.

trxcllnt commented 7 years ago

@jordalgo one key guarantee of maintaining functional purity (at the level of the async primitive) is deterministic memory management; auto-disposal on termination guarantees you won't have memory leaks (e.g. from forgetting to remove an event listener) in your async code. Since the Observer grammar specifies you won't be notified after termination, not tearing down the subscription would only leak memory.

Observables are ultimately just referentially transparent functions (as opposed to stateful lists of Observers to notify). It may be helpful to think of auto-disposal as the push-dual of deallocating the memory in a stack frame after a function returns.

@appsforartists onError and onComplete just mirror throw and exit for Arrays/Enumerables. obs1.concat(obs2).subscribe() is the push version of array1.concat(array2).forEach(). Observables are the push-dual of Haskell's List monad. Haskell has no trouble lazily enumerating infinite lists, and it's quite common/useful to have infinite Observables as well (Observable.interval(10) is an infinite list of positive integers). But without onError or onComplete, you can't even write an operator as simple as take, so cutting those messages out of the grammar severely limits the types of problems you can solve.

appsforartists commented 7 years ago

@trxcllnt We can always add them back if we need them. Our working hypothesis is that for interactive systems, an author ought to be able to declare the system once and never worry about it being torn down because something in the chain called error/complete.

If we ever find we need to build something like take that won't ever dispatch new values, even if the system is interacted with again, we'll revisit that hypothesis. 😃

jordalgo commented 7 years ago

@trxcllnt I feel like we may be talking past each other as I'm quite confused as to how not closing the subscription when, specifically, the producer calls observer error causes memory leaks. Essentially you'd be turning error into another next, which doesn't seem beneficial at first but when you start to incorporate operators like map you can allow errors to pass through (un-mapped) and let the observer decide how they want to deal with them e.g.

const backendPoll = new Observable(observer => {
  const poller = setInterval(() => {
    requestDataFromBackend(res => {
      if (res.status !== 200) {
        observer.error(new Error('Uh oh. Non 200 received.'));
      } else {
        observer.next(res);
      }
    });
  }, 2000);
  return () => { clearInterval(poller); };
});

backendPoll
.map(data => {
  // do something cool
})
.subscribe({
  next: mappedData => {},
  error: error => {
     // send user a message but keep listening
  }
});

AND if the producer wanted to add extra protection it could do this

const backendPoll = new Observable(observer => {
  const poller = setInterval(() => {
    try {
      requestDataFromBackend(res => {
        if (res.status !== 200) {
          observer.error(new Error('Uh oh. Non 200 received.'));
        } else {
          observer.next(res);
        }
      });
    } catch (e) {
       observer.error(e);
       observer.complete();
    } 
  }, 2000);
  return () => { clearInterval(poller); };
});

I don't see how this makes the primitive not functionality pure (perhaps I'm missing something).

trxcllnt commented 7 years ago

@jordalgo perhaps the easiest way to understand this is to write the equivalent synchronous function. Does this function throw an error, or return a result? It can't do both:

function backendPoll() {
  let result;
  try {
    result = requestDataFromBackend();
  } catch (e) {
    throw e;
  }
  return result;
}

While I don't wish to discount your use-case, the problem you're describing is a level or two higher than what the Observer grammar was designed to express. I would encourage skimming the Rx Design Guidelines [pdf] for a more thorough explanation of the design, but I'll try to hit the high notes here.

The Observer design is very low-level. Like, programming-language-design level. From this perspective, the design must strive to define the minimal grammar required to express as many different ideas as possible. Since Observable is the push dual of functional enumerable sequences, we already have a set of rules we need to follow. In fact it makes life easier, since we're not trying to do anything that hasn't already been done before. Observables are a programming language primitive, which is why they make sense to build into JavaScript, like they already are in C#, Dart, and maybe others at this point.

onError is the push-dual of throw, which means it has to do all the same things throw does (interrupt the current operation and switch execution to try/catch blocks), except for push-sequences.

If you take throw out of a language, you've removed a critical structure of control-flow, and can't describe the same things, or at least, can't describe them the same way. Obviously there are languages which have different error-handling semantics, but JS has this one, so it's not worth debating the merits of throw or try/catch.

If we agree on that, then we get back to the original topic, tearing down subscriptions after onError is called. Again, it's helpful to look at the behavior of throw with respect to GC to know what to do. If a function allocates memory for a pointer it intends to return, but somewhere along the line it throws an error, the VM knows it's safe to GC that memory.

But since push semantics aren't built into the VM, we have to help the VM out a little bit if we're going to fully implement the push-dual of throw. After onError has switched to the error branch, we have to help the VM out a bit by manually removing refs to async resources we've allocated (event listeners, open sockets, etc.). Ideally the VM knows about async resources through language features (like async/await), but until then, we have to do it ourselves.

As to your use-case, from a PLT perspective, the error-handling primitive isn't just another value type, it's a separate branch of control-flow. If you want to work with errors as values, you can materialize errors as values through another branch of the value path, and compose them like any other value. Your proposed design for onError may be adequate for this case, it can't be generalized to all cases, while the general solution can be specified to handle this case.

As an exercise, here's one way to implement a solution that materializes non-200's into values without terminating the stream:

const backendPoll = new Observable(observer => {
    const values = new Subject();
    const errors = new Subject();
    let resource, interval = setInterval(() => {
        resource = requestDataFromBackend(res => {
            resource = null;
            if (res.status !== 200) {
                errors.next(new Error('Uh oh. Non 200 received.'));
            } else {
                values.next(res);
            }
        });
    }, 2000);
    observer.next({ values, errors });
    observer.complete();
    return () => {
        clearInterval(interval);
        if (resource) {
            resource.unsubscribe();
        }
    };
});

backendPoll
    .mergeMap(({ values, errors }) => Observable.merge(
        values.map(data => {
            // do something cool
        }),
        errors.do(err => {
            // print the error, or combine with values, takeUntil, etc.
        })
    ))
    .subscribe((mappedData) => { /* etc. */ })
jordalgo commented 7 years ago

@trxcllnt - Thanks for taking the time to discuss this. Unfortunately, either I'm missing something very obvious or I'm doing a terrible job of explaining my POV 😄

So first off, I agree with pretty much everything you said e.g. onError being the push dual of throw, Observable being the push dual of functional enumerable sequences, etc... However, as I tried to explain earlier, I'm really not advocating for a change to the Observable type, which is why I haven't re-opened this issue. I totally get that it has certain rules that have been well defined for a long time. I'm really just asking an open question, which is: Is a strict Observable the only or even the best push primitive that we should be bringing to JS? (maybe that's a separate issue I should open :trollface: )

As you've illustrated in your examples, it's definitely possible to accomplish my use-case using current Observable semantics. However, I'm still not convinced that an alternative push primitive (probably very similar to an Observable) that doesn't close and clean-up resources onError, for example, isn't a better design for JS.

To address some of your points:

If you take throw out of a language, you've removed a critical structure of control-flow, and can't describe the same things, or at least, can't describe them the same way. Obviously there are languages which have different error-handling semantics, but JS has this one, so it's not worth debating the merits of throw or try/catch.

I'm not debating the merits of try/catch in the same way that node wasn't when it used the CPS design pattern. To steal a quote from this article: "When a function passes its errors to a callback it no longer has to make assumptions on how that error should be handled. readFile() itself has no idea how severe a file read error is to your specific application. It could be expected, or it could be catastrophic. Instead of having to decide itself, readFile() propagates it back for you to handle." I feel the same should exist for the future JS push primitive.

In addition (and sorry if I'm repeating myself) since this current Observable spec does not catch any thrown errors and pipe them to onError which would manually remove refs to async resources we've allocated, then I don't quite see the point of this being forced on the producer when they have the power to do this themselves by calling complete. I feel like your argument of the primitive needing to de-allocate resources only holds water if the primitive catches thrown errors then closes and cleans up the subscription as a result, which this currently doesn't do.

trxcllnt commented 7 years ago

I'm really just asking an open question, which is: Is a strict Observable the only or even the best push primitive that we should be bringing to JS?

I honestly can't answer that for anyone besides me, as this is more about personal taste than programming language design.

I can say that over the years I, and many others, have found Observables to be a fantastic solution to a broad range of problems. Asynchrony cross-cuts problem domains; doing it well is a headache in every system I've ever built. For the few problems one-way push sequences aren't adequate on their own (like back-pressure), they can be combined with their Enumerable dual to build Async Iterables, and the meticulous adherence to duality between the two makes them a delight to work with :-)

Observables have held up after years of battle-testing in some of the biggest software systems in the world (Netflix Edge, Microsoft Azure, and soon Facebook's infrastructure), which is exactly the quality of proof I'd personally feel comfortable building into a language that millions of people work with every day.

"When a function passes its errors to a callback it no longer has to make assumptions on how that error should be handled. readFile() itself has no idea how severe a file read error is to your specific application. It could be expected, or it could be catastrophic. Instead of having to decide itself, readFile() propagates it back for you to handle." I feel the same should exist for the future JS push primitive.

I feel like your argument of the primitive needing to de-allocate resources only holds water if the primitive catches thrown errors then closes and cleans up the subscription as a result, which this currently doesn't do.

You're absolutely right, which is why I've argued so strongly against this in #119 and #47. Unfortunately the current state of this project does reflect a fundamental departure from the Rx grammar, a change which I believe should require the same level of proof as we have for the existing Observable semantics. Unfortunately I can't berate @jhusain in person about this anymore since I'm no longer at netflix.

jordalgo commented 7 years ago

I honestly can't answer that for anyone besides me, as this is more about personal taste than programming language design.

Definitely but given this might be become part of the language, I'm advocating for my own taste 😄 and attempting to back it up with arguments.

For the few problems one-way push sequences aren't adequate on their own (like back-pressure), they can be combined with their Enumerable dual to build Async Iterables, and the meticulous adherence to duality between the two makes them a delight to work with :-)

This is a great argument for keepingonError as is 👍 though I would love to see some code samples just to help solidify this argument (obviously only if you're still interested in continuing this discussion lol).

Observables have held up after years of battle-testing in some of the biggest software systems in the world

No disagreement here. There are many other stream/observable-like libs out there but definitely none as popular as Rx's.

Unfortunately the current state of this project does reflect a fundamental departure from the Rx grammar, a change which I believe should require the same level of proof as we have for the existing Observable semantics.

I'm just riding this slippery slope because if this spec violates the Observable contract by not catching thrown errors to close subscriptions and de-allocate resources is it still an Observable ? Perhaps that level of specific error handling is not in the contract.

jordalgo commented 7 years ago

After a lot of consideration I’ve decided to re-open this thread for the following reasons:

I know this is a departure from how RxJs currently implements Observable but that doesn’t mean it shouldn’t be considered and discussed further as other libraries (mentioned in the above comments) have this behavior.

It’s mostly been @trxcllnt and I discussing this, which has been incredibly valuable, but I would also really appreciate input from @jhusain , @headinthebox and other experienced devs in this arena.

(Additionally, and I hesitate to mention it because I think what I said above is the best option, perhaps a case could be made for only closing/cleaning-up when these two events occur 1, 2)