WICG / observable

Observable API proposal
https://wicg.github.io/observable/
Other
563 stars 13 forks source link

How important is the "firehose" problem? #12

Closed domfarolino closed 11 months ago

domfarolino commented 1 year ago

A 2019 attempt at reviving the old TC39 proposal involved an API simplification that incidentally allowed Observables to handle the "synchronous firehose of data" problem. That is, it allows a subscriber to unsubscribe (via AbortController, in that proposal) while an Observable is synchronously pushing perhaps a "firehose" of data to the next() handler before subscribe() returns (see this example).

My understanding is that neither RxJS nor zen-observable currently support this, but if we're generalizing the API for the platform maybe we should consider it? Designing Observables such that they handle this seems nice, as they can unsubscribe via token based cancelation in the next() handler and even before subscribing is complete if the producer pushes "enough" (or "too much") data.

On the other hand, it could get a little clunky. With this approach, subscribe() returns nothing and the assumption is that your only interaction with a "subscription" is via the AbortController that you must create before subscribing. That's not terrible, but it could get worse if we ever wanted to give more functionality to a "subscription" beyond just cancellation. In that world we might need to introduce a Subscription object that subscribe() returns, but then you'd have two objects to keep track of for any given subscription: the AbortController that you created before calling subscribe() and then the Subscription returned from subscribe().

That, plus the fact that two major Observable libraries today don't support this (IIUC) might mean there is not enough appetite to cater our design to it. Thoughts?

benlesh commented 1 year ago

So, it's a problem... but it's a bit of an edge case. The problem usually comes from composition where people are creating "inner subscriptions" for things like custom-implemented operators that subscribe to observables to "flatten them" like concatMap et al.

To illustrate the issue, I'll just use a function (not like the whole pipeable operator thing from RxJS):

/**
 * A function that flattens an observable of observables
 */
function flatten<T>(sources: Observable<Observable<T>>) {
  return new Observable<T>(subscriber => {
    let outerComplete = false;
    const innerSubscriptions = new Set<Subscription>();

    const sourceSubscription = sources.subscribe({
      next: (innerSource) => {
         const innerSubscription = innerSource.subscribe({
           next: value => subscriber.next(value),
           error: err => subscriber.error(err),
           complete: () => {
             innerSubscriptions.delete(innerSubscription);
             if (outerComplete && innerSubscriptions.size === 0) {
               subscriber.complete();
             }
           }
         })
         /*** THE PROBLEM EXISTS IN THIS TEMPORAL SPACE ***/
         innerSubscriptions.add(innerSubscription);
      },
      error: err => subscriber.error(err),
      complete: () => {
        outerComplete = true;
        if (innerSubscriptions.size === 0) {
          subscriber.complete();
        }
      }
    });

    return () => {
      sourceSubscription.unsubscribe();

      for (const subscription of innerSubscriptions) {
        subscription.unsubscribe();
      }

      innerSubscriptions.clear();
    }
  })
}

function take<T>(source: Observable<T>, count: number) {
  return new Observable<T>(subscriber => {
    let counter = 0

    const subscription = source.subscribe({
      next: (value) => {
        if (counter < count) {
          subscriber.next(value);
          if (++counter === count) {
            subscriber.complete(); // NOTE: causes teardown
          }
        }
      },
      error: err => subscriber.error(err),
      complete: () => subscriber.complete(),
    })

    return () => subscription.unsubscribe() // teardown
  })
}

const firehose = new Observable<number>(subscriber => {
  let n = 0;
  while (!subscriber.closed) {
    subscriber.next(n++);
  }
});

And then we use it like this:

const source = Observable.from(['start'])

const flattened = flatten(source);

const threeValues = take(flattened, 3);

// Logs three values, then locks the thread! :)
threeValues.subscribe(console.log);

The problem is that since we have to wait for the inner observable's subscribe call to complete and return before we get a handle we can use to tear it down, we can't flip the inner observable "closed" in time. It still only emits the 3 values, but that while (!subscriber.closed) loop in the inner subscription will never stop.


RxJS has been handling this internally by composing subscriptions through chaining Subscriber during construction of the Subscriber. However that's mostly an internal implementation detail.

There is an RFC right now about moving to a more token-like API that resolves this issue (and a couple of other more RxJS-related issues)... although in RxJS's case we're leveraging Subscriber and Subscription as an "owner", I intentionally left signal out of it for now. I suspect that's where this should head.

The only problems I have with AbortSignal for RxJS usage:

  1. They're SUPER slow for registering and unregistering event handlers (this is probably solvable at a runtime level)
  2. They're not very ergonomic when it comes to creating "child subscriptions" so to speak. That would be the act of creating new AbortControllers that "follow" an existing AbortSignal.

But again, I think those are solvable issues.

I'm 100% amenable to providing APIs in RxJS-land that utilize AbortSignal, but only really if it's to match something that lands in the platform.

benlesh commented 1 year ago

TLDR here is that I think moving to a token-based cancellation mechanism resolves any issues you'd have with a "synchronous firehose".

domfarolino commented 11 months ago

I think I kinda follow the example above, but I'm struggling for a few reasons.

Even then, when I experiment with it I get an error because flatten() accesses innerSubscription inside of innerSource.subscribe's complete() handler before innerSubscription is actually finished being declared. Also I'm a little confused because I guess nothing seems to be calling unsubscribe()? But regardless, I guess the point is that yeah, we're never giving a chance for unsubscribe() to run outside of the next() handler, so that if the subscription function tries to honestly check subscription.closed (like in firehose), it doesn't actually matter.

Given all of that, I'm happy we're going the token-based cancellation route and that it would help stop issues in scenarios like this. It sounds like there isn't much actionable here, so shall we close this @benlesh?

benlesh commented 11 months ago

Shouldn't take() call subscriber.next() sometime?

Yes, that was an oversight. I didn't test the code :) I've updated it.

Also I'm a little confused because I guess nothing seems to be calling unsubscribe()?

When you complete() it will teardown. That teardown should include unsubscription of any created subscriptions.

A very important design feature of observable is that teardown will always be called if:

  1. The producer completes.
  2. The producer errors.
  3. The consumer unsubscribes.

It sounds like there isn't much actionable here, so shall we close this @benlesh?

Yes we can close this

mk-pmb commented 2 months ago

@benlesh I'd like to try and disambiguate some variable names in your example and then try and play around with it. Would you mind releasing it as free software? (Could be as simple as a gist with the MIT license's copyright line and terms at the top.)

Do we have an example implementation of Observables that I could run this with?

Then maybe you could add a test case there that checks whether the current implementation is affected by the problem. (The infinite while loop should probably become a too-far-counting for loop.) (Edit: I see now that instead, the test should be able to accept any Observable factory or constructor.)

domfarolino commented 2 months ago

Just try it out in the chromium implementation under chrome://flags.