WICG / observable

Observable API proposal
https://wicg.github.io/observable/
Other
563 stars 13 forks source link

On using `AbortSignal` instead of `Subscription` #19

Closed benlesh closed 11 months ago

benlesh commented 1 year ago

I'm TOTALLY okay with it, and it solves a lot of problems one might have with the "synchronous firehose" issue that comes with dealing with potentially synchronous observables.

However, if we're doing that, you'd want to have the signal passed through the subscriber in the constructor in some way, so you can send it along to any inner subscriptions.

It's also worth noting that registering teardown logic with AbortSignal is cumbersome and very slow. It might be nice to have a better way to do that.

Finally, if we're going that route, the "teardown return" in the constructor is no longer necessary.

Examples:

// An observable to stream from a socket
const streamFromSocket = new Observable(subscriber => {
  const socket = new WebSocket('wss://example.com');
  socket.on('open').subscribe({
    next: () => socket.send('start'),
    signal: subscriber.signal,
  });
  socket.on('message').subscribe(subscriber)
})

const ac = new AbortController();

streamFromSocket.subscribe({
  next: console.log,
  signal: ac.signal,
})

// An observable to stream from someone's custom type
const customData = new Observable(subscriber => {
  const customDataSource = new CustomDataSource();
  customDataSource.onData = e => subscriber.next(e);

  // This is REALLY unergonomic and doesn't account for
  // subscribers that are already "closed".
  subscriber.signal.addEventListener('abort', () => {
    customDataSource.destroy()
  })
})

We may wish to add a custom method to Subscriber that does that abort registration for users:

// An observable to stream from someone's custom type
const customData = new Observable(subscriber => {
  const customDataSource = new CustomDataSource();
  customDataSource.onData = e => subscriber.next(e);

  subscriber.addTeardown(() => {
    customDataSource.destroy()
  })
})
domfarolino commented 1 year ago

However, if we're doing that, you'd want to have the signal passed through the subscriber in the constructor in some way, so you can send it along to any inner subscriptions.

Agreed. The current IDL sketch does already expose Subscriber.signal as an IDL attribute. The addTeardown() method is interesting, are you thinking the benefit would be to auto-call the teardown function at some point (maybe after the subscription function is called) if the user passes in a subscriber whose signal is already-aborted?

benlesh commented 1 year ago

Well, in RxJS, we have subscriber.add(teardown) where teardown could be a Subscription or a () => void. (It should be noted that Subscriber extends Subscription in RxJS)

The works roughly as follows:

add(teardown: Subscription | () => void) {
  if (teardown === this) return;
  if (this.closed) {
    if (typeof teardown === 'function') {
      teardown();
    } else {
      teardown.unsubscribe();
    }
  } else {
    if (!this.teardowns.has(teardown)) {
      this.teardowns.add(teardown);

      if (isSubscription(teardown)) {
        // make sure the child removes itself from the parent
        // if it unsubscribes first.
        teardown.add(() => {
          this.teardowns.remove(teardown);
        })
      }
    }
  }
}

remove(teardown: Subscription | () => void) {
  this.teardowns.delete(teardown);
}

This is just for reference. The "chaining" part is particularly useful when composing observables and building custom operations.

benlesh commented 1 year ago

Honestly, this add thing might be something that would be better added to AbortSignal directly? That's probably OOS here. Having signal on Subscriber might be enough... it'll just be unergonomic until AbortSignal gets some features, I suppose.

tbondwilkinson commented 1 year ago

+1 to adding features to AbortSignal is probably the right path forward.

In particular, AbortSignal is itself an EventTarget that emits abort events, so people may look at that and wonder whether AbortSignal could also support Observable-like semantics.

benjamingr commented 1 year ago

@benlesh AbortSignal support AbortSignal.any for this use case.

benlesh commented 11 months ago

@benjamingr that is very nice and helpful. I'll update the reference implementation to use that.

The only thing I see AbortSignal missing at this point is the fact it doesn't really prevent developers from screwing up and adding abort handlers to already-aborted signals. RxJS's Subscription provides some safety (outlined above) here... but basically if you add a teardown to a Subscription that is already closed, it will immediately execute that teardown, with the assumption that it's important to clean things up. Without that, developers will need to check aborted every time before they call addEventListener('abort', handler, { once: true }). All of which is pretty crappy DX.

Precedent for this would be Promise, which if it's already resolved, will execute any handler passed to then (scheduled of course, because promise). Honestly, if not for the scheduling, Promise would have been an ideal abort signal. Oh well.

But a simple whenAborted type method would be awesome.

Basically something like:

AbortSignal.prototype.whenAborted = function (callback, { signal }) {
  if (this.aborted) {
    callback();
  } else {
    this.#abort ??= this.on('abort').take(1);
    this.#abort.subscribe({ next: callback, signal });
  }
}
bakkot commented 11 months ago

For whenAborted, there's this old issue (though note that it suggests giving you a Promise, which would not allow you to synchronously call the cleanup function - for Zalgo reasons you probably don't want that anyway, though).

domfarolino commented 11 months ago

FWIW I do prefer AbortController/AbortSignal over Subscription, however I'm sympathetic to the foot-gun-yness of passing in an already-aborted signal and accidentally not handling that case. I think I'd be open to adding subscriber.addTeardown() as sugar over the signal.aborted check. I kind of like the first-class API for teardown, and it would be useful until AbortSignal gets this functionality (if it ever does). At worse, it becomes a little redundant if AbortSignal ever gets this.

Does that seem reasonable? Alternatively we could just leave this alone and purely go the modify-AbortSignal-instead route; I'm pretty open to both.

benlesh commented 11 months ago

I think that's a reasonable compromise.

benlesh commented 11 months ago

though note that it suggests giving you a Promise, which would not allow you to synchronously call the cleanup function - for Zalgo reasons you probably don't want that anyway.

@bakkot Honestly, zalgo isn't the biggest concern here, the bigger concern is making sure that any resource (which can be presumed at design time will be "expensive") should be torn down as soon as possible, even if the developer screws up somehow. Generally, no one would be counting on a synchronous side effect inside of an addTeardown call or whatever you'd want to call it. It's not a callback for fetching anything. And anything you'd want to know about whether it was called could be divined by checking closed on the subscriber (analogous to singal.aborted, really)

annevk commented 11 months ago

Why would you not also want tear down when you're done with the Observable somehow? E.g., in the "take 3" scenario. It seems unexpected that it would only run when it's being aborted.

domfarolino commented 11 months ago

I think you would want the teardown to run when the Observable is "done". At least that's what https://github.com/domfarolino/observable/pull/61 aims to do. The idea is that tear down would end once the observable signals completion or error (for producer-initiated teardown/completion) or when the consumer "unsubscribes" via aborting. Does that make sense?

annevk commented 11 months ago

I guess I don't understand why a signal would indicate completion. That's not part of abort signals.

domfarolino commented 11 months ago

Signal doesn't indicate completion. Signal abortion indicates the consumer of an Observable canceling its interest in the Observable and triggering the supplied teardown function. On the other hand if the Observable calls the subscriber's complete() function, to indicate that it has fully completed and will push no more values to the consumer, the teardown is also run, but signal isn't touched at all.

annevk commented 11 months ago

I don't see that in #61? If I look at the logic for what it is supposedly sugar for, the complete case does not seem covered?

domfarolino commented 11 months ago

Ah, yeah I guess my saying that addTeardown() is "syntactic sugar" conflicts with https://github.com/domfarolino/observable/pull/61/files#diff-b335630551682c19a781afebcf4d07bf978fb1f8ac04c6bf87428ed5106870f5R470-R471, which describes how the teardown is called during complete() and error(). I will clarify that PR now.