tc39 / proposal-observable

Observables for ECMAScript
https://tc39.github.io/proposal-observable/
3.06k stars 90 forks source link

Permit unsubscribe to return a promise #211

Open conartist6 opened 3 years ago

conartist6 commented 3 years ago

I find it is not a well known feature but the spec allows sync and async iterators to specify a return() method. This in turn enables the creation of iterables that have handles to underlying resources, like files or connections. Since resources referred to by handles are usually accessed (in modern styles) with async APIs, I'm particularly interested in the AsyncIterator.return method, which returns a promise. If such a method exists, there is no way to know that it would be safe not to call it, and it would seem to me unwise not to await on the promise. Doing so would cause errors to escape their context, instead bubbling up to the global promise rejection handler. Thus, in order to be able to create Observables from async iterators which hold underlying resources, unsubscribe should be able to return a promise.

conartist6 commented 3 years ago

I guess the question is: what's the point? Is it timing, so that other operations are sequenced after underlying resources are released? Is it error handling? Will subscribe code know how to handle cleanup failure in a meaningful way?

runarberg commented 3 years ago

For fun I implemented an async iterator of fromEvent:

Implementation ```js /** * @template {EventTarget} Target * @param {string} type * @param {Target} target * @returns {AsyncGenerator} */ export default function fromEvent(type, target) { /** @type {Event[]} */ const buffer = []; let isListening = true; /** @type {EventListener} */ function handler(event) { buffer.push(event); } function cleanUp() { isListening = false; target.removeEventListener(type, handler); } target.addEventListener(type, handler); return { return() { cleanUp(); return Promise.resolve({ value: undefined, done: true }); }, throw(error) { cleanUp(); return Promise.resolve({ value: error, done: true }); }, async next() { // eslint-disable-next-line no-unmodified-loop-condition while (buffer.length === 0 && isListening) { await new Promise((resolve) => globalThis.setTimeout(resolve)); } if (!isListening) { return { value: undefined, done: true }; } const value = /** @type {Event} */ (buffer.shift()); return { value, done: false }; }, [Symbol.asyncIterator]() { return this; }, }; } ```
Test Cases ```js import test from "ava"; import fromEvent from "../from-event.js"; test("from event", async (t) => { const target = new EventTarget(); const listener = fromEvent("foo", target); const iter = listener[Symbol.asyncIterator](); const fooEvents = [new Event("foo"), new Event("foo")]; target.dispatchEvent(fooEvents[0]); target.dispatchEvent(fooEvents[1]); t.deepEqual(await iter.next(), { value: fooEvents[0], done: false }); t.deepEqual(await iter.next(), { value: fooEvents[1], done: false }); }); test("ok on empty buffer", async (t) => { const target = new EventTarget(); const listener = fromEvent("foo", target); const iter = listener[Symbol.asyncIterator](); const fooEvents = [new Event("foo"), new Event("foo")]; globalThis.setTimeout(() => { target.dispatchEvent(fooEvents[0]); target.dispatchEvent(fooEvents[1]); }, 50); t.deepEqual(await iter.next(), { value: fooEvents[0], done: false }); t.deepEqual(await iter.next(), { value: fooEvents[1], done: false }); }); test("`.return` removes the listener", async (t) => { const target = new EventTarget(); const listener = fromEvent("foo", target); const iter = listener[Symbol.asyncIterator](); const fooEvents = [new Event("foo"), new Event("foo"), new Event("foo")]; target.dispatchEvent(fooEvents[0]); target.dispatchEvent(fooEvents[1]); t.deepEqual(await iter.next(), { value: fooEvents[0], done: false }); t.deepEqual(await iter.next(), { value: fooEvents[1], done: false }); listener.return(); target.dispatchEvent(fooEvents[2]); t.is((await iter.next()).done, true); }); test("`.throw` removes the listener", async (t) => { const target = new EventTarget(); const listener = fromEvent("foo", target); const iter = listener[Symbol.asyncIterator](); const fooEvents = [new Event("foo"), new Event("foo"), new Event("foo")]; target.dispatchEvent(fooEvents[0]); target.dispatchEvent(fooEvents[1]); t.deepEqual(await iter.next(), { value: fooEvents[0], done: false }); t.deepEqual(await iter.next(), { value: fooEvents[1], done: false }); listener.throw(new Error("error")); target.dispatchEvent(fooEvents[2]); t.is((await iter.next()).done, true); }); ```

I think fromEvent is better abstracted as an observable, rather then an async iterator, even though it might be better consumed inside a for await ... of. You’ll notice there are a few footguns in this implementation, including a potential infinite loop if we forget to change the isListening flag (if a child iterator closes when the event buffer is empty).

You will also notice that there is a subtle difference in behavior if more then one event is dispatched in the same tick. An observable will run in the same tick but an async iterator will always wait until the next tick to run the handler. This subtle difference might be important in some user code where an event triggered would alter the behavior of how subsequent events are fired.

Thirdly, notice that I have to create my own process loop that runs once per tick. I’m not an engine expert so this might be minor, but I do wonder about the performance implication of this, especially if the task required to run in this main loop is non-trivial, or if an inexperienced (or tired) programmer fails to find an optimized version to check if an action is needed.

conartist6 commented 3 years ago

@runarberg I don't think we're talking about the same thing. You're talking about making Observables into async iterables. I'm talking about making async iterables into observables. In general I agree that async iteration is not well suited to the general pubsub use case, particularly when there are multiple subscribers. For one thing there's the problem you mention with sync things becoming async, which leads to lost performance (and the potential for flickering behavior in UIs). Another problem is that stack traces are lost, and it becomes hard to tell why things happened. Also the order in which subscribers are notified about an event is then determined by some very hidden engine internals and may (or may not be) essentially random.

Since async iterables have these problems, if they are the data source it may make sense to convert them to observables when there are multiple subscribers interested in the "events" (resolution of iterator step promises) they produce. My wish is that this be possible, and that an observable created from a generic async iterator be able to ensure that the async underlying iterator is cleaned up correctly.