tc39 / proposal-observable

Observables for ECMAScript
https://tc39.github.io/proposal-observable/
3.06k stars 90 forks source link

Should Observable implement `Symbol.asyncIterator`? #180

Open benlesh opened 6 years ago

benlesh commented 6 years ago

FWIW: I feel like i've asked this before, but I can't find the issue.

Basically the idea would be to support for-await language features. It's sure to pop up in user land, we're experimenting with this in RxJS right now.

The default behavior would be to buffer values and send the one at a time. This comes with the obvious downside as being a source of potential memory growth (because of the buffering behavior and allocating a promise at each turn)

The upside is subscription with for-await is pretty cool. and I think it rounds this out as a language feature nicely. (I'm a fan of interop)

for await (const x of someObservable) {
  console.log(x);
}
zenparsing commented 6 years ago

I think there's precedence for this kind of buffering in DOM Streams already. cc @domenic?

domenic commented 6 years ago

We haven't gotten around to implementing it in Streams, but it's definitely the plan.

It's more of a no-brainer there because streams already maintain their own internal buffer, and have mechanisms for signaling backpressure, so e.g. if you await inside the for loop body, the flow will slow.

For observables it's a bit more of a footgun, because it leads to buffering with no backpressure mechanism. Whereas, if we just restricted people to forEach(), then they can't await inside of it, which is a bit better. (Or, well, they can, by passing an async function; just nobody will use the returned promise, and the callback might be called again while the async function is paused.)

I'm on the fence here, and could be persuaded either way.

benlesh commented 6 years ago

It sort of makes me wish we had @jhusain's for..on syntax... but as a language feature, could we not stipulate that for await..of does what @jhusain proposed for observables if it's operating over an observable? Seems like that would eliminate the footgun, and it shouldn't result in behavior that would surprise anyone.

Basically I'm now thinking that for await(const x of someObservable) { foo(x) } could "just work" without a Symbol.asyncIterator implementation, (maybe because Symbol.observable exists).

I realize that's probably out of scope for this proposal.. but is it? @domenic?

domenic commented 6 years ago

The idea behind for-await-of is specifically that it works in async functions, interspersed with awaits. If we wanted syntax for observables, we'd specifically not want to allow await to work within the body of the loop, because of the issues I noted above.

Overall I'm not sure "looping over" an observable makes as much conceptually sense, so adding or adapting looping syntax is pretty tricky. subscribe(x => ...) is much clearer in that it's a function, which the creator of the observable will call zero or many times in the future. Trying to say that the block body of a loop is really a function-in-disguise that the creator of the observable will call zero or many times in the future is a bit weird.

domenic commented 6 years ago

What I mean by the above is:

Basically: it's slightly broken to make observables async iterables, but trying to fix that brokenness leads into weird and confusing places, so we're probably better off just doing the straightforward thing.

zenparsing commented 6 years ago

Thanks @domenic.

I'm personally leaning towards thinking that the footgun potential is too high. After all, async/await is designed for easy ("you don't need to worry about the async part") async programming. We might not want to tuck away important, tricky, and potentially problematic subtleties like buffering behind that easy facade. It might be better to make users explicitly opt-in to a buffering strategy.

On the other hand, I think it would be great if Observable.from could create an observable from an async iterator.

benlesh commented 6 years ago

we'd specifically not want to allow await to work within the body of the loop, because of the issues I noted above.

Ah, yeah, that makes sense. Haha... I withdraw my suggestion.

However, if we're OK with a slight footgun, it might be fine to just allow easy adaptation from observables to async iterators, and thus automatically make them work with for-await-of.

It seems like this sort of footgun could just as easily be created by anyone's async function*, depending on what they're doing. So I'm in favor of this addition, if for no other reason than to make observable cooperate a little more nicely with the language.

While I haven't seen much demand for this in RxJS, I haven't seen many for await loops in production apps yet, either. I do know that with common observable use, the vast majority of the time the values will arrive way too slow to cause any sort of back pressure issues.

zenparsing commented 6 years ago

It seems like this sort of footgun could just as easily be created by anyone's async function*, depending on what they're doing

How so? Can you give an example?

benlesh commented 6 years ago

How so? Can you give an example?

I think "just as easily" is probably the part you're questioning here. I suppose it's not as easy, hahaha. But basically any code where someone might be buffering values waiting for someone pull them out. Maybe a wrapper around a socket or something... or if someone tried to implement mousemoves in terms of async iterables.

The more I think about this, the more I think it's probably got a great idea. It's probably better handled via some sort of explicit function where you can specify a strategy.

I'm going to close this one. Thank you for helping me explore this a little further. It's a shame, because 99.9% of the time I don't think it would be a problem, and it seems like it would be a nice bit of ergonomics.

zenparsing commented 6 years ago

@benlesh Maybe we should leave the issue open just to show that it's still something we might want to continue to think about a bit.

I agree it would be pretty cool if it just worked.

domenic commented 6 years ago

Yeah, one thing that would help me one way or the other is getting a sense of whether async iterables in general end up handling backpressure, or just buffering indefinitely. We know streams will handle backpressure, as will other "pull" APIs such as directory listing, but we don't have a lot of experience with other cases yet. If it turns out to be common that such buffering occurs, such that users need to be aware of the hazard anyway, then it seems fine for observables too. I'm just not sure yet.

benlesh commented 6 years ago

If it turns out to be common that such buffering occurs, such that users need to be aware of the hazard anyway,

I suspect this will start happening when users start trying to coordinate an async await with a stream of user events. "Every time this button is clicked, execute this logic, but one at a time, in a serial fashion" as defined with a for await loop.

async function foo() {
  // assuming button.on('click') returns an observable of click events
  for await(let click of button.on('click')) {
    const data = await getData();
    doSomething(data);
    const data2 = await getData2();
    doOtherStuff(data2);
  }
}

If someone wanted to do this currently, they'd have to write some sort of buffering mechanism for button clicks. Having Observable implement Symbol.asyncIterator would help in this scenario, for sure. As the (non-trivial) buffering logic would be done for you.

To be clear, there are a dozen ways to do this, and some of them are probably better, but even in RxJS it ends up being a decent chunk of code that I'm not sure is more readable. Observable.fromEvent(button, 'click').concatMap(() => getData().then(doSomething).then(getData2).then(doOtherStuff))

benjamingr commented 6 years ago

Node streams have implemented Symbol.asyncIterator and I've been using it in my code for all sorts of (non-performance intensive) stuff.

Turns out adapting the semantics works very well - and I'm definitely in favor of observables doing this. There are footguns - but they're relatively minor and things have been working out really well so far.

Also @benlesh see this issue

benjamingr commented 6 years ago

Also https://github.com/tc39/proposal-observable/issues/73

domenic commented 6 years ago

Node streams have backpressure support, though, so all the issue discussed here do not apply.

zenparsing commented 6 years ago

It occurs to me that we already (almost) have a for-on (an async loop that does not allow await inside of it): forEach.

async function main() {
  await element.on('click').forEach(event => {
    doSomethingSync(event);
  });
}

The only thing missing is a way to break from within the "loop". We could provide that ability if we added the subscription as a second argument to the callback:

async function main() {
  await element.on('click').forEach((event, subscription) => {
    if (event.x > 100) {
      return subscription.unsubscribe();
    }
    doSomethingSync(event);
  });
}
benlesh commented 6 years ago

I like that idea. If we were going to round that out though, I'd also remove the start event and have subscription as the second argument to next. Just for consistency. (subscribe would still return the subscription)

zenparsing commented 6 years ago

I just tried this out, and just providing the subscription doesn't quite work for forEach. The issue is that I think we need to unsubscribe and resolve the promise with undefined.

We could provide a cancel function, which would wrap unsubscribe and also resolve the promise...

async function main() {
  await element.on('click').forEach((event, cancel) => {
    if (event.x > 100) {
      return cancel();
    }
    doSomethingSync(event);
  });
}
benlesh commented 6 years ago

@zenparsing ... Is the issue with the promise that you'll have a memory leak if it never resolves or rejects? 🤔I guess there's no way for the runtime to know and clean it up? What happens with a never-promise? new Promise(() => {})?

benjamingr commented 6 years ago

@zenparsing ... Is the issue with the promise that you'll have a memory leak if it never resolves or rejects? 🤔I guess there's no way for the runtime to know and clean it up? What happens with a never-promise? new Promise(() => {})?

No, that works - I've repeated this experiment for most implementations and native promises - never resolved promises don't leak unless you explicitly retain a reference to them (which is very rare).

zenparsing commented 6 years ago

@benjamingr In reference to https://github.com/tc39/proposal-observable/issues/180#issuecomment-364584047, what's currently missing from forEach is a way to "break" out of the loop (similar to a break statement inside of a for-of loop).

If we just provided the subscription as the second argument to the forEach callback, that would give us a way to stop the flow of data into the forEach callback, but would not resolve the promise and allow code to continue executing after awaiting forEach. In other words, the subscription object is not sufficient for emulating break from forEach.

What would work instead is providing a "break" function as the second argument to the forEach callback, as implemented here.

staltz commented 6 years ago

What is the motivation for returning a promise from forEach?

zenparsing commented 6 years ago

@staltz Integration with async/await, mostly. Also, without the promise returned from forEach I would not find the current error-swallowing behavior acceptable. See here for that reasoning.

falsandtru commented 6 years ago

FYI, I designed observability without a queue as this:

  describe('cofetch', () => {
    it('basic', async () => {
      const xhr = await cofetch('');
      assert(xhr instanceof XMLHttpRequest);
    });

    it('cancel', done => {
      const co = cofetch('');
      co.cancel();
      co.catch(reason => {
        assert(reason instanceof Event);
        assert(reason.type === 'abort');
        done();
      });
    });

    it('progress', async () => {
      const co = cofetch('');
      for await (const ev of co) {
        assert(ev instanceof ProgressEvent);
      }
    });

  });

https://github.com/falsandtru/spica/blob/master/src/cofetch.test.ts https://github.com/falsandtru/spica/blob/master/src/coroutine.ts

TheCymaera commented 1 year ago

Yes, because in conjunction with this stage 3 proposal, Observables will get filter, map, and some other functions for free.

benlesh commented 1 year ago

FYI: We've decided in RxJS to have our Observable implement Symbol.asyncIterator, the following reasons:

  1. rxjs-for-await, a library for converting observables to async iterables is pretty popular (npm downloads at around 700k/week)
  2. RxJS has had the exact semantic used for the common case of awaiting each value from the observable, but processing them asynchronously in order for a very, very long time (more than 10 years) and people rarely trip over it. concatMap is basically identical in function with what it's doing. Any hand-wringing about buffering values is seems to be a far enough corner case that no one has really run into it at scale in over a decade.

What I mean by that is these two bits of code are equivalent:

import { concatMap } from 'rxjs';

// Setup
const source$ = getSomeObservable();

function sleep(ms: number) {
  return new Promise(resolve => setTimeout(resolve, ms))
}

// For every value from `source$` wait process a 1 second timer and then emit the value,
// but only one at a time, and preserve the order (note RxJS handles promises as returns in concatMap)
// then log them. In this case, note that we're not capturing the subscription,
// because we don't care about unsubscribing.
source$.pipe(concatMap(value => {
  return sleep(1000).then(value)
})
.subscribe(console.log)

Which is much easier to read (IMO) in this manner:

// For every value from `source$` wait process a 1 second timer and then log the value,
// but only one at a time, and preserve the order, we aren't really "subscribing" so we're not
// capturing the subscription, because we don't care about unsubscribing.
for await (const value of source$) {
  await sleep(1000)
  console.log(value)
}

If you wanted to unsubscribe from the observable in that async await, it's pretty straight-forward:

for await (const value of source$) {
  if (value > 10) {
    break; // unsubscribed.
  }
  await sleep(1000)
  console.log(value)
}

If you need something more advanced, in RxJS-world, you'd use takeUntil. That enables you to kill the loop with an event. Which in this promise-based case must throw something like an AbortError or whatever you'd want to call it:

try {
  for await(const value of source$.pipe(takeUntil(buttonClicks$))) {
    await sleep(1000)
    console.log(value)
  }
} catch (err) {
  // lame, but such are promises. 🤷 
  if (!(err instanceof AbortError)) {
    throw err;
  }
}

Related discussion in RxJS-land is here: https://github.com/ReactiveX/rxjs/discussions/6779