tc39 / proposal-observable

Observables for ECMAScript
https://tc39.github.io/proposal-observable/
3.06k stars 90 forks source link

Proposal: Buffer notifications during subscription initialization #183

Open zenparsing opened 6 years ago

zenparsing commented 6 years ago

See #179 for background discussion.

Motivation

The subscribe protocol is complicated by the fact that the producer is able to send notifications to the consumer before the call to subscribe has completed. We call this "early emission". In general, the programmer must account for "early emission" by providing a start callback and obtaining a reference to the subscription object before any notifications are sent. In addition, the consumer may also need to remember to check if the subscription is closed after the call to subscribe completes.

"Early emission" has the following disadvantages:

The primary advantage of allowing "early emission" is that it enables the user to synchronously convert from in-memory data to an observable and then back to in-memory data. This technique is frequently used for unit testing. However, by using Observable.prototype.forEach and async/await, users can convert from observables to in-memory data without significantly altering their unit tests.

Goals

Proposal

1. Buffer notifications sent during subscription initialization

We can disallow "early emission", while providing interoperability with observable libraries that choose to allow it, by buffering all notifications that occur before the subscribe method has returned.

During the call to subscribe, if the producer sends a notification to the SubscriptionObserver, that notification is placed in a queue, and a job is scheduled to flush this queue. If additional notifications are sent to the SubscriptionObserver object before the queue is flushed, those notifications are also enqueued.

2. Send synchronous data in a job

We can modify Observable.of and Observable.from (when operating on iterables) to schedule a job to send data to the observer.

3. Remove support for the "start" method

Once we disallow "early emission", the start method is no longer required and can be removed from the API.

Additional Considerations

Implementation Prototype

jhusain commented 6 years ago

I'm open to ideas about how we can prevent sync emission if it will make Observables more ergonomic. Thanks for taking the time to prototype this, and I’ll take a closer look when I get a chance.

One concern I have is that I don’t want to create hazards when refactoring from Iterables to Observables. Consider the following code:

const ys = 
  Array.
    of(“a”,“b”,“c”).
    flatMap(x => 
      Array.
        of(1,2,3).
        map(y => [x,y]));

Now let’s consider the straightforward refactor to Observable.

const ys = 
  Observable.
    of(“a”,“b”,“c”).
    flatMap(x => 
     Observable.
        of(1,2,3).
        map(y => [x,y]));

I submit both programs above should produce the same output. Can this be assured if we inject concurrency into “of” and “from”? Furthermore would we be comfortable making this scheduling strategy the default?

I’m concerned that implicitly injecting concurrency into Observable could lead to notifications being processed in an unpredictable order. That is why one of the guiding principles of this proposal is currently to have the developer explicitly opt-in to concurrency.

zenparsing commented 6 years ago

I submit both programs above should produce the same output. Can this be assured if we inject concurrency into “of” and “from”?

I agree, and I believe that for this example we can prove the ordering will be maintained.

The resulting jobs are:

  1. All outer values are delivered. The mapped results are subscribed to.
  2. Values 1, 2, and 3 are delivered with closure over "a".
  3. Values 1, 2, and 3 are delivered with closure over "b".
  4. Values 1, 2, and 3 are delivered with closure over "c".

There still might be some interesting ordering effects if we mix delayed notification observables like the proposed Observable.of with observables that might notify earlier (e.g. an event observable where we call dispatchEvent in the current job). I'll try to explore this question over the next couple of days.

zenparsing commented 6 years ago

A refactoring from Array methods into Observable methods might run into ordering issues for an auto-flattening method, like the proposed Array.prototype.flatten.

If we assume both an Array.prototype.flatten, which deeply auto-flattens nested arrays, and an Observable.prototype.flatten which deeply auto-flattens observables, then refactoring from something like:

let result = Array.of(1, 2, Array.of(3, 4), 5).flatten(Infinity);
console.log(result); // [1, 2, 3, 4, 5]

to

let result = [];
await Observable.of(1, 2, Observable.of(3, 4), 5).flatten(Infinity).forEach(x => result.push(x));
console.log(result); // [1, 2, 5, 3, 4] ??

might result in some surprises (depending on how flatten is implemented, of course).

The analogy between Observable and Array/Iterable can only be stretched so far, though, (especially since we've consistently moved away from iterable/observable duality) and this might be a case where we say that flatten doesn't make sense for Observable.

benjamingr commented 6 years ago

I'm not sure I agree, I think flatten does make sense for observables - and I'd expect result to be [1, 2, 3, 4, 5].

The fact iterable/observable are not dual means that the signatures of the relevant interface aren't mirrors of each other - iterables and observables are still ordered which is an important property of them, and flatten should probably just wait on nested observables to complete() before moving to the next observable in this case.

staltz commented 6 years ago

This divergence of behavior wouldn't occur if Observables would have the synchronous (recursive) scheduler by default, which is the case in RxJS:

Rx.Observable.of(1, 2, Rx.Observable.of(3, 4), 5)
  .map(x => x.subscribe ? x : Rx.Observable.of(x))
  .mergeAll()
  .subscribe(x => console.log(x));
1
2
3
4
5

Note 1: Array.prototype.flatten does autoconversion of non-arrays to arrays, but Rx.Observable.prototype.mergeAll doesn't, so Observable.prototype.flatten doesn't have an equivalent in RxJS.

Note 2: Array.prototype.flatten doesn't deeply flatten, unless given an argument. By default (like you used), it would flatten one level only, hence shallow flatten. I know it's not going to affect the argument you just made about Array vs Observable, though, just wanted to point it out.

benjamingr commented 6 years ago

This divergence of behavior wouldn't occur if Observables would have the synchronous (recursive) scheduler by default, which is the case in RxJS:

@staltz might explaining that part? I realize that this is the default behavior in RxJS but to clarify - you mean this would be the behavior even if we defer notifications until after the subscription function is done like the issue suggests?

zenparsing commented 6 years ago

@staltz I've corrected the example to use Infinity, thanks!

staltz commented 6 years ago

you mean this would be the behavior even if we defer notifications until after the subscription function is done like the issue suggests?

Hi @benjamingr, I meant the divergence EcmaObservable vs Array occurs if we defer notifications, but the divergence does not occur in RxJS.

Actually, I just checked, and other schedulers in RxJS still preserve the expected order:

Rx.Observable.of(1, 2, Rx.Observable.of(3, 4, Rx.Scheduler.asap), 5, Rx.Scheduler.asap)
  .map(x => x.subscribe ? x : Rx.Observable.of(x, Rx.Scheduler.asap))
  .mergeAll()
  .subscribe(x => console.log(x));
1
2
3
4
5

So maybe there are hopes that Observable.prototype.flatten could maintain order as well?

benjamingr commented 6 years ago

Yeah I think flatten should maintain its behavior anyway :)

zenparsing commented 6 years ago

I think this line of thinking is begging the question; that question being whether Observable is isomorphic with Array. It doesn't have to be, just like we've decided that observer doesn't have to be isomorphic with generator.

In fact, we know that Observable isn't going to be isomorphic with Array, because some Array methods don't make sense for Observable (e.g. pop), and some of the Array method signatures don't make sense for Observable (e.g. reduce would likely return an Observable instead of a value or Promise for a value).

I think this just amounts to a good argument for not having a recursive, auto-flattening Observable.prototype.flatten.

ljharb commented 6 years ago

It can be conceptually like an array even if it doesn’t have the same API; an array is already some kind of conflation of a List and a Queue and a Stack, and if Observable only maps to one of those concepts, that’s still fine.

zenparsing commented 6 years ago

Observable probably isn't any of those things. I think it's more like Iterator, if we had methods on Iterator.prototype. And if we did have methods on Iterator.prototype, I don't think that we'd have a recursive auto-flattening method because of the "strings are iterable" issue.

benjamingr commented 6 years ago

I think this line of thinking is begging the question; that question being whether Observable is isomorphic with Array. It doesn't have to be, just like we've decided that observer doesn't have to be isomorphic with generator.

Isomorphism is effectively a "renaming" of entities that preserved behavior. Like how points in x/y like (2,2) and linear equations like 6x+1 are isomorphic because you can convert 6x+1 to the point (6,1), add it to another point - say (1,1) and get (7,1) and then turn it back to the polynomial 7x+1 and that would be like adding 1x+1 to it.

It is a morphism (structure preserving map operation) that is invertible. With categories isomorphism is the same thing (structure preserving inversifle function) that effectively means they are identical and just a "renaming of one another". In no case were observables and arrays isomorphic (and if so - under (for) what mappings and operations?).

What was claimed (and shown by Erik) is that observables can be created with a dual interface to that of iterables. In math in categories duality means a correspondence between categories that inverts the source and target of each mapping and the order of composing said mappings. Erik shows this here and it is effectively how observables were derived to begin with as far as I know :)

In either case - a dual interface isn't something we should necessarily do in JS and indeed the conclusion of discussions in the past few years weren't concluded in favor of such a duality.

benjamingr commented 6 years ago

Observable probably isn't any of those things. I think it's more like Iterator, if we had methods on Iterator.prototype. And if we did have methods on Iterator.prototype, I don't think that we'd have a recursive auto-flattening method because of the "strings are iterable" issue.

Well, if we want to make analogies and observable would be like an iterable and a subscription would be like an iterator.

benlesh commented 6 years ago

Simplification of consumer code by disallowing "early emission"

@zenparsing do you have an example of code that would be complicated by allowing observables to emit while the subscribe function is executing?

  1. Remove support for the "start" method

The start handler exists so the consumer can get a handle to the subscription before any values are emitted so they can opt out of any sort of firehose that might come from the observable. If we're to allow synchronous Observables, how would this work? Are you proposing that we make Observable subscription always async ala Promises?

Also, FWIW: An alternative to the start method is to just have the next handler accept a second argument of the Subscription.

zenparsing commented 6 years ago

@benjamingr

Sure. Consider the humble "filter":

function filter(source, filterFn) {
  return new Observable(observer => {
    return source.subscribe(
      val => {
        try {
          if (filterFn(val)) {
            observer.next(val);
          }
        } catch (err) {
          observer.error(err);
        }
      },
      err => observer.error(err),
      () => observer.complete()
    );
  });
}

This implementation is actually incorrect with the current observable spec. Can you spot why?

If filterFn throws during "early emission", the source observable won't be unsubscribed from (because the inner subscription has not been reified yet) and filterFn will continue to be called.

function filterFn(v) {
  console.log(v);
  throw new Error('oops');
}

filter(Observable.of(1, 2, 3, 4, 5), filterFn).subscribe({ error() {} });

// Logs: 1, 2, 3, 4, 5

With the current proposal, all consumers (including combinators) need to be aware of this footgun. Currently, I'm not seeing the benefit from "early emission" which would justify such an ever-present footgun.

Are you proposing that we make Observable subscription always async ala Promises?

See the OP, it's all there. The idea is that all notifications sent "early" (i.e. before subscribe has returned) are queued and delivered in a job.

An alternative to the start method is to just have the next handler accept a second argument of the Subscription.

That might make the footgun a little easier to deal with, but would not eliminate it. Also, it would break the type signature consistency between "observer" and its SubscriptionObserver wrapper.

staltz commented 6 years ago

@zenparsing I think the footgun is not necessarily due to early emission, but due to the fact that we can't naively execute subscribe logic after returning the subscription object, unless through a rearchitecture which is what RxJS does with its Subscriber architecture.

I'm trying to point out that the footgun is not synchronous emission, but it's the return Subscription architecture. In another architecture, callbags, it supports all the semantics that current RxJS does, but doesn't return subscription objects. Check this trick I use when converting iterators to callbag streams: https://github.com/staltz/callbag-from-iter/blob/master/index.js#L20

The problem with returning Subscription objects is that they are fundamentally meant for simple upstream communication to the producer, but we may only receive if after the producer has sent us all the data. I think a more versatile architecture removes returns altogether. That's the case in callbags: the producer is (a function) called by the consumer to do subscribe logic, then the consumer is (a function) called by the producer to do data delivery. There is a greet stage, where both sides will get to acknowledge the existence of each other, and only after that data delivery will happen. Currently in naively implemented Observables, the consumer may get data delivered to it before it has been fully greeted by the producer (represented by the Subscription object).

More about this here https://github.com/callbag/callbag and here https://github.com/callbag/callbag/blob/master/getting-started.md

I'm not insisting on just selling my idea, just want to point out how sync emission isn't necessarily the root cause it seems to be.

zenparsing commented 6 years ago

There is a greet stage, where both sides will get to acknowledge the existence of each other, and only after that data delivery will happen. Currently in naively implemented Observables, the consumer may get data delivered to it before it has been fully greeted by the producer (represented by the Subscription object).

I agree with this view of the problem. It doesn't really make sense to have both a return-based handshake protocol and allow notifications before the subscription is returned (i.e. before the handshake has completed).

We could get rid of the return portion of the handshake, for example by keeping start and returning undefined from subscribe, or by doing something more drastic like the callbag approach.

Or we can disallow notifications before the subscription is returned, as suggested here.

staltz commented 6 years ago

Keeping start on the Observer sounds good to me, and quite conservative (not so many changes to this proposal).

zenparsing commented 6 years ago

@staltz

If we move away from a return-based protocol, then we'll have to explore other options for registering the cleanup routine other than returning it from the producer setup function.

One option might be to require that the producer call start with an optional cleanup function:

function interval(ms) {
  return new Observable(observer => {
    const interval = setInterval(() => observer.next(), ms);
    observer.start(() => clearInterval(interval));
  });
}

And then for composition, start could receive an unsubscribe function (instead of a subscription object).

function map(source, mapFn) {
  return new Observable(observer => {
    source.subscribe({
      start(cancel) { observer.start(cancel) },
      next(v) {
        try { observer.next(mapFn(v)) }
        catch (e) { observer.error(e) }
      },
      error(e) { observer.error(e) },
      complete() { observer.complete() },
    });
  });
}

A more complex example with this API:

function combineLatest(...sources) {
  if (sources.length === 0)
    return Observable.of();

  return new Observable(observer => {
    let count = sources.length;
    let values = new Map();
    let cancels = [];

    observer.start(() => cancels.forEach(cancel => cancel()));

    sources.forEach((source, index) => {
      if (observer.closed)
        return;

      Observable.from(source).subscribe({
        start(cancel) {
          cancels.push(cancel);
        },
        next(v) {
          values.set(index, v);
          if (values.size === sources.length)
            observer.next(Array.from(values.values()));
        },
        error(e) {
          observer.error(e);
        },
        complete() {
          if (--count === 0)
            observer.complete();
        },
      });
    })
  });
}

With this design, the subscription object goes away entirely (which is kind of nice, since it was just a wrapper around unsubscribe anyway).

staltz commented 6 years ago

👍 to this idea and to the removal of Subscription object as a wrapper.

zenparsing commented 6 years ago

What do you think should happen if the producer attempts to call "next", "error", or "complete" before calling "start"? We can't send an error to the consumer, since the handshake hasn't completed yet.

I suppose we could either:

  1. Throw an error to the caller ("Observer is not ready", or somesuch), or
  2. HostReport it (crashing the program on servers)

I'm leaning towards throwing the error, since the handshake didn't complete and there's nowhere else for the error to go.

staltz commented 6 years ago

I think that problem should be treated like any other violations of the Observable contract (such as calling next after complete).

The updated Observable contract should be, as regex, (start)(next)*(error|complete)?. How is (next)(start) a different violation than (complete)(next) regarding its aftermath?

staltz commented 6 years ago

By the way, with the removal of the subscription object, there still needs to be a way for the Observable constructor to get a reference to the cancel function, because it should call that when error or complete are sent to the consumer.

zenparsing commented 6 years ago

with the removal of the subscription object, there still needs to be a way for the Observable constructor to get a reference to the cancel function

Yes. I'm thinking that the wrapper observer's start method would grab a reference to the cleanup function before calling the wrapped observer's start method.

I think that problem should be treated like any other violations of the Observable contract (such as calling next after complete).

Good point. Another option that I didn't think of before: the wrapper observer could just automatically call start (as if by observer.start()) if a notification is sent before start is called.

zenparsing commented 6 years ago

I've created a branch of zen-observable using this modified "no-return" protocol, and here are my thoughts so far:

I think the last point means that we would have to drop Symbol.observable in favor of some other duck-typing mechanism (perhaps also symbol-based). One option might be to have a Symbol.subscribe method. For Observable, this would just be an alias of Observable.prototype.subscribe.

Thoughts?

staltz commented 6 years ago

Thumbs up for Symbol.subscribe, it looks like Symbol.iterator (which I would have named as Symbol.iterate, notice the verb symmetry with "subscribe").

I'm curious about Jafar's thoughts.

jhusain commented 6 years ago

Definitely interested in this idea. Continue to believe sync notification is the right approach. Agree with the idea that the push and pull together is the biggest source of pain. It’s worth noting that this moves the shape of Observable closer to that of Java’s proposed reactive sreams, which don’t return Subscription objects.

The main concern I have is the impact to the ergonomics of subscribe. This issue could be mitigated with takeUntil. It also may not be a big deal if we expect people to primarily use forEach. Overall I think there is a strong argument that returning a subscription is an attractive nuisance.

benlesh commented 6 years ago

Just to be clear... a "job" in this case refers to a "microtask"? If so, I think it will severely limit this Observable's usefulness as a primitive. It's very easy to "opt-in" to async, it's impossible to opt out. It also means that this Observable wouldn't be usable for modelling anything that could be synchronous, like EventTarget.

One of the arguments that seems to have come up a few times is that we don't want the Observer to be a different shape from the SubscriptionObserver. There's no real reason they can't be, they're two different types of objects with similarly named methods, that's all.

dinoboff commented 6 years ago

@benlesh EventTarget subscription doesn't need to be synchronous; when you register a listener, it won't be called until after the end of the current job. It would mainly affect emission from array, like Observable.from or observables that replay previous event.

zenparsing commented 6 years ago

@benlesh What do you think of the alternative presented in https://github.com/tc39/proposal-observable/issues/183#issuecomment-367417690?

With that API, we retain synchronous Observable.of and Observable.from, but remove the subscription object return value and instead pass an unsubscribe function through the "start" callback.

benlesh commented 6 years ago

I think the 99% use case of that will be trying to get that function out of that start handler so it can be used elsewhere:

class SomeFrameworkyComponent {
  onMounted() {
    const self = this;
    someObservable.subscribe({
      start(cancel) {
        self.unsubscribe = cancel;
      }
    });
  }

  onWillUnmount() {
    if (this.unsubscribe) this.unsubscribe();
  }
}

Overall, it's not very ergonomic for that use case (which, in my experience, is the most common use case).

Also, we had agreed in the past (I'd have to find the issue), that having an object with an unsubscribe function on it (or the like) was better than just a function, as it was better for readability.

It's going to be very rare that someone wants to even use start with the current API.

benlesh commented 6 years ago

@dinoboff

EventTarget subscription doesn't need to be synchronous; when you register a listener, it won't be called until after the end of the current job.

This isn't true.

const handler = () => console.log('clicked');

console.log('start');
document.addEventListener('click', handler);
document.dispatchEvent(new Event('click'));
document.removeEventListener('click', handler);
console.log('end');
staltz commented 6 years ago

What if cancel is also returned from subscribe?

class SomeFrameworkyComponent {
  onMounted() {
    this.unsubscribe = someObservable.subscribe({
      start(cancel) {
        // cancel === this.unsubscribe
      }
    });
  }

  onWillUnmount() {
    if (this.unsubscribe) this.unsubscribe();
  }
}

So that observer.start() is the primary and safe way of using cancel function but the returned value is an ergonomic shortcut.

That said, even the const self = this + self.unsubscribe = cancel usage isn't that much more unergonomic. In fact, the good thing is that it's contained. You only need to pay attention to it in onMounted, not elsewhere.

And yet another suggested usage:

class SomeFrameworkyComponent {
  onMounted() {
    someObservable.subscribe(this);
  }

  start(cancel) {
    this.cancel = cancel;
  }

  next(x) {
    // the observer's 'next' is this method
  }

  onWillUnmount() {
    if (this.cancel) this.cancel();
  }
}
zenparsing commented 6 years ago

There's no need to capture this as self.

class SomeFrameworkyComponent {
  onMounted() {
    someObservable.subscribe({
      start: (cancel) => this.unsubscribe = cancel,
    });
  }

  onWillUnmount() {
    if (this.unsubscribe) this.unsubscribe();
  }
}
benlesh commented 6 years ago

here's no need to capture this as self.

Completely fair. It's still not as intuitive. Given this is the same with the current proposal:

class SomeFrameworkyComponent {
  onMounted() {
    this.subscription = someObservable.subscribe();
  }

  onWillUnmount() {
    if (this.subscription) this.subscription.unsubscribe();
  }
}

What if cancel is also returned from subscribe?

That has been proposed a few times, and it was decided that having an explicit call named unsubscribed helped readability/searchability

zenparsing commented 6 years ago

@benlesh Using the returned subscription object may be more intuitive, but unfortunately it's also subject to the footgun described in https://github.com/tc39/proposal-observable/issues/183#issuecomment-367028072.

In the general case (where an observable might emit early) the consumer might not have access to the subscription at the time when it needs to unsubscribe.

benjamingr commented 6 years ago

@zenparsing An alternative, since this is just unsubscribing - we can be OK with it being impossible to unsubscribe synchronously or from events emitted synchronously. Honestly I don't think I've ever needed to unsubscribe synchronously. Unsubscribing being important typically implies there is expensive work to do or a resource being held onto which emission during subscriber isn't really a problem for.

I only have my own limited experience though.

staltz commented 6 years ago

@benjamingr it's not about explicit unsubscribes in a synchronous style, it's mostly about guaranteeing the observable contract is met, e.g. see the synchronous error example many comments above.

zenparsing commented 6 years ago

@benjamingr We should be able to unsubscribe synchronously so that users can easily build their own combinators using the Observable constructor, without falling into the "the subscription object hasn't been returned yet" footgun.

As @jhusain mentioned, takeUntil could also be used here. Assuming that we have AbortController and that AbortSignal implements Symbol.subscribe, we could do something like:

class SomeFrameworkyComponent {
  onMounted() {
    this.abortController = new AbortController();
    someObservable.takeUntil(this.abortController.signal).subscribe();
  }

  onWillUnmount() {
    if (this.abortController)
      this.abortController.abort();
  }
}
zenparsing commented 6 years ago

The above is actually a good reason to not have subscription objects at all, since they duplicate the functionality of cancel token-like things such as AbortController.

I'm wondering, since we're talking about changing the signature of subscribe anyway, if it's worth considering changing the name to observe (and Symbol.observe):