tc39 / proposal-observable

Observables for ECMAScript
https://tc39.github.io/proposal-observable/
3.06k stars 90 forks source link

Reduced API with async/await support #205

Closed alshdavid closed 3 years ago

alshdavid commented 4 years ago

Simpler API with support for async/await through the use of an object iterator.

This is inspired by Go's channels. It's intended that this:

Piping operators, creating "subjects" and applying a "subscribe" function can happen in userland through libraries.

Example Implementation Codesandbox

Example usage:

const numbers = new Observable(next => {
  const interval = setInterval(next, 1000)
  return () => clearInterval(interval)
})

for await (const number of numbers) {
  console.log(number)
  if (number === 4) {
    break
  }
}

API

interface Observable<T> {
    [Symbol.Iterator]: Promise<T>
}

type TeardownFunc = () => void

type SetupFunc<T> = (
  next: (value: T) => void,
  complete: () => void,
) => TeardownFunc

type ObservableConstructor<T> = (
   setup: SetupFunc<T>
) => Observable
hax commented 4 years ago

Shouldn't it be AsyncIterator?

ForsakenHarmony commented 4 years ago

How do you cancel it?

alshdavid commented 4 years ago

To cancel the subscription you simply break the loop

ForsakenHarmony commented 4 years ago

In your case the clearInterval never gets called because there seems to be no way to cancel from the consumer?

alshdavid commented 4 years ago

Iterators can have setup and teardown logic

class Observable {
  *[Symbol.asyncIterator]() {
    // Setup logic when a loop starts
    for (const i of [1,2,3,4,5]) {
      try {
        yield Promise.resolve(i)
      } finally {
        // teardown logic when a loop is broken
      }
    }
  }
}

When someone starts iterating over the object you can check if there are any subscribers already, if not, run the setup logic.

When a loop is broken, you can check if the amount of subscribers is 0, if it is, run the teardown logic.

You can see an imperfect implementation here https://github.com/alshdavid/reactive-channels/blob/master/src/private-channel.ts

MDN Docs

benlesh commented 4 years ago

Since observables are all push-based, there are memory and back pressure concerns unless you use a lossy strategy to adapt the incoming values into the asyncIterator. I have a proposal to add this as a feature to RxJS, but it's not without it's drawbacks.

Otherwise, what you've proposed here can be accomplished with async function*.

dy commented 4 years ago

Not sure where to put that (sorry for possible offtopic), but Observable can serve as handy foundation for asyncIterable, they play well together, example:

  async *[Symbol.asyncIterator]() {
    let resolve, buf = [],
      p = new Promise(r => resolve = r),
      unsubscribe = this[Symbol.observable]().subscribe(
        v => (buf.push(v), resolve(), p = new Promise(r => resolve = r))
      )

    try {
      while (1) yield* buf.splice(0), await p
    } catch {} finally {
      unsubscribe()
    }
  },
  [Symbol.observable]: () => ({
    subscribe: (next = next.next || next) => {
      let handler = e => next(e.detail.props),
        unsubscribe = () => el.removeEventListener('evt', handler)
      el.addEventListener('evt', handler)
      return unsubscribe.unsubscribe = unsubscribe
    },
    [Symbol.observable]() { return this }
  })

Fusing subscription logic into asyncIterator directly would be confusing, having it in lazy observable.subscribe is neat, would be nice to simplify API though. Also sync nature of observable helps avoiding many async-related issues, it gets tricky with asyncIterators sometimes.

@benlesh just curious do you have an idea what's the main blocker for the proposal?

richytong commented 4 years ago

Observables are not async iterables. Though they do compete for the same streaming data model, it is not right to try to base either one off of the other unless you want spaghetti code. I write more about this here

dy commented 4 years ago

@richytong Observable is invaluable for events, also it is sync - that makes it different from async iterable. Also I find it easier to provide minimal subscription code with observable, rather than dealing with async generators / promises resolution. All I wanted to say - if some structure implements Symbol.observable - it is fairly easy to add async iterable support (for await) − the opposite is not necessarily true.

it is not right to try to base either one off of the other unless you want spaghetti code

Not sure I could get that argument from the article of why your observable code turns spaghetti. I had the opposite experience (regardless of rxjs) with spect or zen-observble - Symbol.observable acts as implicit reactive layer for any data structure, making code tidy.

richytong commented 4 years ago

@dy Observable may be invaluable for events, but there are safer and less painful ways via async iterables to do what observables are currently doing.

also, declaring an observable data structure may be sync, but you have to think about the events coming through observables as async; those events could come in at any time.

I was referring to @benlesh's new library as spaghetti because he tries to turn observables into async iterables. That library is not one function that turns observables into async iterables; it is four functions providing four separate fallback behaviors in the case that Observables run into backpressure issues. These are the same backpressure issues that async iterables do not have to deal with.

dy commented 4 years ago

less painful ways via async iterables to do what observables are currently doing.

Can you give an example? In my experience that's not the case. During spect@15 iteration I found that to make many async iterables properly it needs promise resolution loop with buffering (this way or another), which is not trivial:

 async *[Symbol.asyncIterator]() {
      // emit initial value
      if (get) yield get()
      let buf = [], resolve, promise = new Promise(r => resolve = r),
      try {
        while (1) {
          await promise
          // from the moment promise was resolved until now, there could be an extra code that resolved promise one more time
          while (buf.length) {
            yield buf.pop()
            // from the moment yield got control back, the buf could've been topped up again
          }
        }
      } catch (e) {
      } finally {
      }
    },

Observables don't have that issue of missing out events between ticks. Also - this async iterator code naturally builds up on existing [Symbol.observable] - otherwise it'd take mixing up various concerns (subscribe, promise, buffer, batch?, skip?) in a single [Symbol.asyncIterator] method, which may turn code into mess.

RxJS is difficult - yes, but Observable is not, it is minimal language primitive for reactivity concern (you're welcome to invent how to minimize it more). I believe that's winning for everybody - for standardizing vast variety of reactive libs and as subscription provider for async iterators. Observable pattern emerges in asyncIterators anyways (for free!) - why not making it a standard.

With Observable handling subscribing concern, async iterator can focus on meaningful optimizations, like skipping duplicates, unchanged state, batching multiple updates, screening, throttling etc.

Observables run into backpressure issues.

I had an impression that observable is push-stream and should not care about consumer.

obedm503 commented 4 years ago

From learning Rxjs, I've understood that Observable is used for push-based streams while AsyncIterator is used for pull-based streams.

richytong commented 4 years ago

@dy Just because something is difficult doesn't mean you have to learn it. RxJS has made your life hard and yours too @obedm503 because the entire library is built up around the Observable. While Observables certainly sound nice on paper (lazy evaluated, multiple push source, cool streams), your computer will run into memory issues due to backpressure if anything downstream of your Observable needs to do something in series with the events at scale.

I had an impression that observable is push-stream and should not care about consumer.

You are absolutely right about this. Observables really don't care about their consumers. That's why NodeJS streams, which are analogs to Observables, have run into many pains in this regard. I quote @benlesh

Since observables are all push-based, there are memory and back pressure concerns unless you use a lossy strategy to adapt the incoming values into the asyncIterator. I have a proposal to add this as a feature to RxJS, but it's not without it's drawbacks.

^ that turned into this library. Four strats for the price of your sanity. This is fine and dandy for a userland lib, but if we're talking spec, there needs to be something more robust. Actually, there is something - AsyncIterable - and it's already in the spec. The only thing it's missing is a nice API to consume it.

Can you give an example?

Here you are, from my userland lib

const { map, transform } = require('rubico')

const asyncGenerator = async function*() { yield 1; yield 2; yield 3 }

const square = x => x ** 2

transform(map(square), [])(asyncGenerator()) // [1, 4, 9]

My approach is not new; it uses transducers which hails from the land of Functional Programming. If you want something hard to wrap your head around but rewarding as hell, learn about transducers. My approach transforms the AsyncIterable created from asyncGenerator into an array. Some pros

During spect@15 iteration I found that to make many async iterables properly it needs promise resolution loop with buffering

You really don't need buffering with async iterables. The consumer will pull the next item when it is ready. What exactly are you making into an async iterable in your example?

benjamingr commented 4 years ago

@richytong hey, as Ben and others know I have been very critical of Rx in many places so this is not me jumping to defend it because I am a fanboy.

Your message is FUD. This whole thread shows a fundamental lack of research and understanding of the material. You make several unsubstantiated claims and several claims that simply show you did not grok Rx (for example it is the opposite of lazy after you subscribe so saying it is nice because it is lazy is one such fundamental misunderstanding).

Please read the meeting notes and docs. Promises are not suitable for this events have to be synchronous. The reason why has been discussed at least 20-30 times in this repo alone...

Attacking Ben to plug your library is extremely bad manners IMO. Please stop, do your research and contribute more constructively.

For those of you asking why these proposals are stalled - these sort of attacks that take a big chunk of emotional energy from proposal champions is a big part of why.

benjamingr commented 4 years ago

Also as this proposal is mostly dead in its current form anyway (afaik) discussing these things here is mostly pointless :)

richytong commented 4 years ago

@benjamingr Thank you for setting me straight and for your recommendations. I will take a look and focus more on my own contributions.

benjamingr commented 4 years ago

Ok, your response was well mannered enough to not leave you "hanging" and I couldn't actually find all the answers in this repo so I figured I'll recap some points worth mentioning:

For what it's worth - a lot of the API quirkiness is because of the "duality" of observable to LINQ, you can see this (now old) talk about it. A lot of the API was inspired by C#'s initially.

This repo contains a version of the proposal that is intentionally very small and contains as few features as possible in order to allow for interop and extensions. The goal of es-observable was to add a language level primitive so that libraries (like RxJS and presumably your library) could interop (easily) via a shared interface (Symbol.observable or Symbol.observer or whatever ended up last). It was supposed to usher in a new era the same way the Promises/A+ spec did for promises and allow for interop and an ecosystem.


Of course, you are welcome to use async iterables for whatever you want and write libraries. This (mostly dead) repo isn't "async iterable vs. observable" or "observable vs. promise" or "one observable library vs. another" it's just "minimal observable API so platforms like Node and browsers (via DOM) can adopt observables in their APIs.

dy commented 4 years ago

this proposal is mostly dead in its current form

The proposal is used by hundreds of packages if not more (9M+ downloads per week), it has multiple implementations and growing demand - ecosystem and many opinionated reactive implementations would only win from the standard. That's live in userland.

benjamingr commented 4 years ago

@dy to clarify, when I wrote:

this proposal is mostly dead in its current form

I meant "the push to promote this specification in its current form to the ECMAScript language at its current iteration".

I am very much still interested in this and I am interested in standardization so Node APIs are automatically convertible to observables.

alshdavid commented 4 years ago

[Redacted]

benjamingr commented 4 years ago

@alshdavid did you read the rest of the discussion in the issue? It looks like you are still arguing for async semantics which are a no-go (explained now 21-31 times in the repo and not 20-30 after https://github.com/tc39/proposal-observable/issues/205#issuecomment-643665719 )

alshdavid commented 4 years ago

Yep sorry, redacted my comment in response.

What is the expectation surrounding Observables in the spec? Is there a primitive type we can use currently or can expect to use as a target for rxjs-like functionality?

benjamingr commented 4 years ago

What is the expectation surrounding Observables in the spec?

Mostly stalled I think?