tc39 / proposal-observable

Observables for ECMAScript
https://tc39.github.io/proposal-observable/
3.06k stars 90 forks source link

Simplification of Observable API #201

Open benlesh opened 4 years ago

benlesh commented 4 years ago

Okay, I'm going to throw my hat back in and see if I can resurrect this a little.

What I'm going to propose is slightly different than the current proposal and different than RxJS, but I strongly feel it will work.

API

interface Observable<T> {
  new (
    initialization: (
      nextHandler: (value: T) => void,
      errorHandler: (err: any) => void,
      completeHandler: () => void, 
      signal: AbortSignal
     ) => void;
  ): Observable<T>

  subscribe(
      nextHandler?: (value: T) => void,
      errorHandler?: (err: any) => void,
      completeHandler?: () => void, 
      signal?: AbortSignal
  ): void;

  forEach(nextHandler: (value: T) => void, signal: AbortSignal): Promise<void>

  first(signal: AbortSignal): Promise<T>;

  last(signal: AbortSignal): Promise<T>;
}

The idea is to remove the need to define Observer and Subscriber, as in other Observable implementations, and use simple functions instead. Also using a cancellation token (ala AbortController/AbortSignal) instead of introducing a Subscription type.

I realize that AbortController and AbortSignal are not a part of JavaScript proper. However, I strongly feel JavaScript could use a cancellation primitive, and Observable, which is also a primitive, is not as useful without it.

Defining an observable instance

Below is the simplest use case for an observable. A synchronous set of values.

  test("should at least work", () => {
    const source = new Observable((next, error, complete, signal) => {
      next(1);
      next(2);
      next(3);
      complete();
    });

    let results = [];

    source.subscribe(value => results.push(value), null, () =>
      results.push("done")
    );

    expect(results).toEqual([1, 2, 3, "done"]);
  });

Handling of "firehose" synchronous data

With a cancellation token, like AbortSignal, handling synchronous firehoses and stopping them due to external unsubscription becomes a bit more intuitive than it was with previous designs, IMO:

 test("should handle firehose", () => {
    let loops = 0;
    const source = new Observable((next, err, complete, signal) => {
      for (let i = 0; i < 1000000000 && !signal.aborted; i++) {
        next(i);
        loops++;
      }
      // this will noop due to protections after abort below
      // which is "unsubscription".
      complete();
    });

    const controller = new AbortController();
    const results = [];

    source.subscribe(
      value => {
        results.push(value);
        if (results.length === 3) {
          // "unsubscribe"
          controller.abort();
        }
      },
      null,
      // complete should not be called, because of the
      // abort (unsubscription) above
      () => results.push("done"),
      controller.signal
    );

    expect(loops).toBe(3);
    expect(results).toEqual([0, 1, 2]);
  });

Concessions

first and last may not be necessary, and are more "nice to have"s for this type. Their primary use cases would be for wrapped HTTP calls, which, in a world where AbortSignals were prolific, should probably just be done via fetch.

Cons

There are a few cons to this design. Notably, from my perspective, it's not completely compatible with current popular designs. But I'm less worried about that than getting the appropriate primitives into the language.

Other thoughts

It's possible to have this implement Symbol.asyncIterator with a known behavior, like buffering all values internally until they are read. This, of course, comes with some potential issues around back-pressure and memory pressure, but I think that's easy to understand for most people who might use this type with for await.

Another con is creating a "Subject", which is a common type created to compose with observables, becomes mildly challenging, in that it would need to be something that could be destructured into three functions and an abort signal, but again, I don't think that's really a concern for language authors. The community can take care of that.

Links

I've tossed together a demo here.

Repo: https://github.com/benlesh/tc39-observable-proposal Codesandbox: https://codesandbox.io/s/tc39-observable-proposal-proposed-change-uxh4p

benlesh commented 4 years ago

NOTE: AbortSignal could be replaced with any other cancellation standard that lands, provided it's a token-type cancellation.

acutmore commented 4 years ago

Can see that this implementation does not have the semantics of the 'safe' SubscriptionObserver i.e. guaranteed that calling nextHandler, errorHandler or completeHandler would never throw.

Is that because this issue should just focus on discussing just the API design and not other semantics, or because this new design would not come with that guarantee? Didn't want to presume either way.

zenparsing commented 4 years ago

Cool - we probably should have went down this "function-first" road from the start, but we had a "no closure" bias, if I recall...

Arguably, the cancel token in this API would best be a "subclass" of observable (with a subscribe method, etc.).

benjamingr commented 4 years ago

It's alive!🥳

So what about https://github.com/tc39/proposal-emitter ?

gre commented 4 years ago

That's a great simplification. One downside I see is the need to remember the ordering of params especially for the subscribe.

Also another suggestion is that maybe you don't need a signal but just a function:

new Observable((next, error, complete) => {
  next(1);
  next(2);
  next(3);
  complete();
  return () => {};
});
const unsub = source.subscribe();
unsub();
benlesh commented 4 years ago

@gre: The signal also serves as a mechanism to check whether or not cancellation is closed during synchronous emission. Consider the following:

const source = new Observable((next, error, complete) => {
  let i = 0;
  while (true) { next(i++); }
  return () => { /* doesn't matter what you do here, it's never reached */ };
});

const controller = new AbortController();

source.subscribe(
  value => {
    if (value > 4) controller.abort();
  }
),
null,
null,
controller.signal
);

In the above, there's really no way to signal that we want to stop the while loop in the Observable. A token or signal solves that cleanly:

const source = new Observable((next, error, complete, signal) => {
  let i = 0;
  while (!signal.aborted) { next(i++); }
});

@zenparsing ... I'm game to explore other cancellation primitives, (AbortController isn't the best, but it's a good start), using a subclassed Observable might be nice. The main thing is that is needs to be something we can:

  1. synchronously check to see if it's "closed" or "aborted" or whatever, for the firehose case.
  2. Create "children" of, such that operators like mergeMap, etc. can be implement by the community.
benlesh commented 4 years ago

@benjamingr I've expressed my concerns and opinions about the emitter proposal here: https://github.com/tc39/proposal-emitter/issues/26#issue-506860476

gre commented 4 years ago

@benlesh I don't find this necessarily too verbose:

const source = new Observable((next, error, complete) => {
  let i = 0;
  let aborted;
  while (!aborted) { next(i++); }
  return () => {
    aborted = true;
  }
});

If the idea is to remove the types like Observer and Subscription, why not simplifying this too.

Moreover, your example is to me the most uncommon one, most of the time you would "clean up" things like timeout or event unlistening. And for these, this is general much simple to me:

const source = new Observable((next, error, complete, signal) => {
  const interval = setInterval(next, 1000);
  return () => clearInterval(interval);
});

Now try to solve this with signal: it sounds like you're going to need an event listener. Who is cleaning up the listener on the signal? Is it getting called even when the observable was unsubscribe? Is it leaking if you reuse a signal at many places? what if I consume my observable more than once?

benlesh commented 4 years ago

@gre that cannot possibly work, as it will enter an infinite loop before returning that cancellation function.

It's also not as uncommon as you might think. Converting array to observable, or creating a range observable, or an iterable (the iterable could even have side effects on each turn). All of these things require that we have the ability to check and existing flag of some sort to make sure we aren't looping unnecessarily.

gre commented 4 years ago

Good point sorry I missed half of the code point. Please ignore the first part of my answer. Second part remain. @benlesh

gre commented 4 years ago

Yet I'm curious why in your code you are not prone to the same infinite loop. No way it can works in monothreaded JS 🤔

benlesh commented 4 years ago

@gre Sorry, I edited my other response. I guess I wasn't expecting you to respond so quickly.

benlesh commented 4 years ago

@gre The reason it works is because the signal provided was created before the for loop was ever entered, If any action that occurs on next (called in the body of the for loop) aborts the signal, the signal will be flagged as aborted synchronously. Which means on the next pass through the loop, it will see it has been aborted, and stop the loop.

gre commented 4 years ago

Ok that's a great usecase for the sync loop then. So now I'm curious about how the second problem I mentioned would be solved with signal (cleaning up listeners and timeouts) and so it doesn't leak listeners and call listeners once when observable is done (as it's the case in rxjs). The fact signal comes from outside makes it hard to solve.

benjamingr commented 4 years ago

Would it make sense for RxJS to migrate to this API (experimentally)?


An observable is just a setter for a setter (basically why RxJS and things like MobX or Vue are fundamentally the same idea :] ). so if we want a much more minimal API the following could work:

(By the way, if anyone other than Ben (whom is probably familiar with the material) wants a good intro I recommend @headinthebox who authored Rx's https://channel9.msdn.com/Events/Lang-NEXT/Lang-NEXT-2014/Keynote-Duality )

type Iterable<T> = () => (() => T);
type Observable<T> = (observer: (arg: T) => void) => void;
// would have preffered being able to write `T => () => ()`

Then things like error handling and completion can be build on top of that. Your example could be:

  test("should at least work", () => {
    const source = (next) => { next(1); next(2); next(3); }

    let results = [];

    source(value => results.push(value));

    expect(results).toEqual([1, 2, 3]);
  });

After all in regular RxJS subscribe (without the fluff) is just:

class Observable {
  constructor(subscribe) { this.subscribe = subscribe; }
}

Although to be fair I am fine with either API and I don't think that's why this proposal has stalled :]

SerkanSipahi commented 4 years ago

Hello @yyx990803 (Vuejs Creator), would you like to get involved in the discussion? As far as I know, vuejs has its own Observable implementation. It would be good if we could create a standard here with RxJx, Mobx, Vue, others (I don't know the others).

The discussion/proposals should not only arise from the RxJs perspective!

benlesh commented 4 years ago

@benjamingr the "fluff" is actually what makes Observable an important primitive.

  1. Guarantees that "teardown" occurs on error, completion, and unsubscription (consumer disinterest).
  2. Guarantees that calling next, complete and error handlers can't be called out of order or otherwise unreasonably.
  3. Guarantees that unhandled errors are propagated in a way that makes sense and doesn't cause strange, hard to debug errors.

If I thought we could do this reliably and safely with ordinary functions, I wouldn't be arguing it belonged in the language, as we already have functions.

benlesh commented 4 years ago

Cool - we probably should have went down this "function-first" road from the start, but we had a "no closure" bias, if I recall...

@zenparsing, yeah, I recall reading something about a function-first approach before, but I think one of the problems there was we also needed a way to stop the synchronous "firehose", and subscriber.closed provided that. With a cancellation token, we no longer are hindered by that. (And honestly, from a maintainer's perspective, cancellation tokens would clean up a reactive library considerably, there's a lot of dancing around the fact that subscriptions need to exist up front)

acutmore commented 4 years ago

The Fetch API returns a Promise that rejects on signal.abort.

Should the promise returning methods in this proposal mirror that behaviour? Right now the promise remains in a pending state.

Perhaps the single Fetch example is not enough of a precedent to follow, but can see myself being asked to explain the difference when training developers

appsforartists commented 4 years ago

I'm not a TC39 member, so I don't have enough context to know what makes this proposal easier/harder than the previous one.

As a user, I prefer Observer because named parameters have better usability than positional ones.

new Observable(
  // That's a lot to remember
  (next, error, complete, abort) => {}
)
// vs
new Observable(
  // I can use only the parts I need, and order doesn't matter
  ({ complete, next }) => {}
)

To your point, Subject is a very useful tool, and it's trivial when the shape of Observer is well-defined.

zenparsing commented 4 years ago

@benlesh In the interest of taking a fresh look at things, let me offer my perspective on the firehose issue.

I've never liked it, although I realize that users typically don't complain about it, given typical Observable usage scenarios. But in my experience with implementing combinators, it's a reoccuring footgun. The naive solution is typically wrong due to it.

In zen-observable (which has some usage, although certainly not anywhere near as much as RxJS), I ended up disallowing firehosing completely. Instead, the "firehosed" items get queued up, and "of" and "from" don't firehose (they wait a tick before sending). The change went in almost two years ago, and although I've gotten (awesome) bug reports over that time, I've never seen a single complaint about the queuing behavior.

If you avoid the firehose problem, then you can just return a cleanup function from the Observable init function. Then you can have a clean layering: Observables don't need to assume a cancellation primitive (other than functions of course), and cancellation tokens can be built on top of Observable.

acutmore commented 4 years ago

Personally I quite like the AbortController-AbortSignal, though can see how that makes this proposal a little more difficult to digest on the Node side, not having those APIs already.

I have forked the original CodeSandBox with an example of how cancelation could be done using Observables to cancel Observables as @zenparsing has already mentioned (though I didn't add a subclass). I also added a few tests to cover the modifications.

https://codesandbox.io/s/tc39-observable-proposal-proposed-change-dqkqd

const source = new Observable((next, err, complete, takeUntil) => {
    let abort = false;
    takeUntil.subscribe(() => (abort = true));
    for (let i = 0; i < 100 && !abort; i++) {
        next(i);
    }
    complete();
});

let abort;
const abortWhen = new Observable(n => {
    abort = n;
});

source.subscribe(
    () => results.length === 3 && abort(),
    null,
    null,
    abortWhen
);

or a more real-life example:

const timeout = (time) => new Observable((next, err, complete, takeUntil) => {
    const id = setTimeout(() => {
        next();
        complete();
    }, time);
   takeUntil.subscribe(() => clearTimeout(id));
});

const onEvent = (elm, event) => new Observable((next, err, complete, takeUntil) => {
    elm.addEventListener(event, next);
    takeUntil.subscribe(() => elm.removeEventListener(event, next));
});

timeout(10 * 1000).subscribe(
    () => launchSatalite(),
    null,
    null,
    onEvent(document.getElementById('cancel'), 'click')
);

To make it work I did also need to change subscribe to return a unsubscribe function.

yyx990803 commented 4 years ago

@SerkanSipahi thanks for tagging me, but Vue's "observable" is focusing on a rather different problem domain, which is why we renamed the API to "reactive" in Vue 3. In particular, modeling the stream of changes over time and dealing with cancellation etc. is not part of the concern in Vue's reactivity system. I see the two systems with slight overlap but can be easily used in a complementary fashion.

benjamingr commented 4 years ago

@benlesh

@benjamingr the "fluff" is actually what makes Observable an important primitive.

Yeah I agree. I am just not convinced that the "fluff" is bad or that the original proposal was blocked on the fluff rather than the heavy lifting. I am perfectly content with a battle tested API over a more minimal one to be honest :]

(I am enjoying the mental exercise though!)

benjamingr commented 4 years ago

@yyx990803 it's actually exactly the same problem doing the exact same thing although I understand why it might not seem that way. That's the point the inventor of Rx makes in the talk I linked to above.

I know it might seem orthogonal but vue.observable and Rx are two sides of the same coin with observables just being setters for setters.

yyx990803 commented 4 years ago

@benjamingr I agree they share the same root at the theory level. But the intention behind their specific designs are aimed at different end user scenarios. So my point is I'm not interested in expanding Vue's reactivity system to be more observable-like. I'd rather it stay minimal and serve its current purpose. Nor am I interested in making observable the default in the Vue system, due to the extra mental overhead it incurs on users. Vue's goal would be making sure its own reactivity system can play nicely with 3rd party observable systems.

benjamingr commented 4 years ago

@yyx990803 I think I am communicating poorly because I really don't think that it needs to be expanded.

Vue's reactivity system (like Knockout, MobX and any other getter/setter or proxy system) is very much already the same thing (not in the root or theory level but in practice) as RxJS observables just with better ergonomics and different goals.

That's what the talk I linked to by Erik shows - an observable is just a setter for a setter with error handling and completion semantics - it's the same thing :]

Regardless I commend your commitment towards a simple and user friendly API :] (which is why I use Vue in my own code)

staltz commented 4 years ago

What is actually the goal of the proposal? I've seen a lot of discussion and bikeshed around the API details, but what will this actually achieve? This repo's readme says:

By offering Observable as a component of the ECMAScript standard library, we allow platforms and applications to share a common push-based stream protocol.

But callbacks are legitimately push-based stream primitives, and are widely used already. They just don't have a contract for next/error/complete semantics. I don't see a need for an Observable primitive in the language if that primitive won't be used everywhere where it makes most sense, e.g. HTTP or for a sequence of push messages (WebWorker postMessage/onmessage) or DOM events. We know the web can't afford a breaking change, so at best we would be adding alternative APIs to supplement the callback-based APIs. But libraries already work as supplements.

But this is a JS spec, not a Web spec. Perhaps Observable could be more useful as a primitive in Node.js? I don't think so, they are making good use of AsyncIterable over there for readable streams.

So – honestly – what's the point of this spec?

FireyFly commented 4 years ago

Wrt web and supplementing callback-based APIs with Observable ones, it seems to be the direction argued in the WHATWG issue on Observables. I would guess that if TC39 specifies a standard Observable interface, it would (hopefully) end up being used in Web specs and APIs in the future too.

gre commented 4 years ago

@acutmore the fear I have with some of this pattern as well as the one exposed with AbortController is what is going on with the left behind listeners. they are leaking aren't they?, it can be a big deal if you use a lot of observable (I can source.subscribe() many times).

const source = new Observable((next, error, complete, signal) => {
  const interval = setInterval(next, 1000);
  return signal.addEventListener(() => clearInterval(interval) /* here */);
});

Also, there should be a guarantee that this won't be called twice.

The only way I anticipate this can be solved is that if that signal param that gets send in the Observable function is NOT the one that comes from subscribe but a new proxy object that is created individually per subscription to bring these guarantee and "release" on end of the observable? This would need to be specified if this was the idea.

Similarly, let's say you want to implement the race(...observable) function, you would have to create an intermediary signal for each?

I understand that this doesn't solve the "firehose sync data" case but this is so much more straightforward.

const source = new Observable((next, error, complete) => {
  const interval = setInterval(next, 1000);
  return () => clearInterval(interval);
});

to avoid the confusion that subscribe's signal is not the same as one in observable, wouldn't this work with a simple callback?

const source = new Observable((next, error, complete, onUnsubscribe) => {
  const interval = setInterval(next, 1000);
  onUnsubscribe(() => clearInterval(interval));
});
acutmore commented 4 years ago

@gre in both the signal and observable implementations the value passed through is not the original but a 'child'. Doesn't create a leak.

I like the onUnSubscribe you suggest. As there is only one valid way to use the value, so making that the only way it can be used is quite nice.

benjamingr commented 4 years ago

@staltz

But this is a JS spec, not a Web spec. Perhaps Observable could be more useful as a primitive in Node.js? I don't think so, they are making good use of AsyncIterable over there for readable streams.

If you have interesting ideas for Observables in Node.js we are definitely interested in learning and considering but observables have no straightforward backpressure story and we prefer pull to push for most things at the moment.

I don't speak for the whole project but based on the sessions from the last few collaborator summits our consensus is that AsyncIterables are planned to have a more prominent role in the Node.js async story.

But callbacks are legitimately push-based stream primitives, and are widely used already. They just don't have a contract for next/error/complete semantics.

Precisely, and that's the point I was trying to make here. The issue is that next/error/complete semantics are really important.

Our EventEmitter in Node.js has error semantics and I think EventEmitter can be really confusing and if we could replace that and unify EventEmitter and EventTarget we (Node) would be interested in that and interoping with it.

So – honestly – what's the point of this spec?

Well, ideally the same way AsyncIterables are being used as "unified streams" in universal JavaScript allowing easy access to browser/node code with syntax support - I would love observables (or the events proposal - I also like that one) to be that for event emitters.

I am personally fine with either this proposal, the rx proposal or the events proposal for that purpose.

The goal (in my opinion - I don't speak for Ben obviously) is to have a unified push primitive that interops through all JavaScript with a contact to guide semantics.

benlesh commented 4 years ago

@staltz, I think the real place Observables will be most useful will be as a common API for pushed events, such as DOM events, SSE, WebSockets, Animations, etc. Generally, something better to use than the current EventTarget API.

The point of this issue was to show that we might be able to not have to standardize quite as much, and maybe try to make this look at little more "JavaScripty", ala Promises, as concessions to get the primitive into the language.

Having a uniform API for event handling that can also be leveraged to create other streams of events, such that resource teardown from all of those things is also uniform, could have a very positive effect on developer ergonomics and reducing the likelihood of memory leaks in apps. Right now you have a situation where you need to keep N function references so you can removeEventListener with them, and you have to remember to call close on some types, or abort on others, maybe hold onto a number so you can clearWhatever with it later. It's a bit of madness. Not to mention there are different ways to subscribe to events from all of these things, and virtually no way to compose them in declarative ways. Getting observable into the standard would be a solid step in that direction. Any observable type, really. Just so long as it's lazy and it doesn't force multicasting.

I think for Node, async iterators will solve a solid number of usecases around file reads, and network IO. However for the frontend, allocating a promise-per-turn for things like mouse movements, or even web sockets, is a bit of madness IMO.

Callbacks, of course, will always work, but they're unfortunately not at all prescriptive, so it results in a Wild West of APIs, no uniformity, and a larger pit of failure for releasing resources.

benlesh commented 4 years ago

@gre If we want to go the route of having a returned teardown function, we only need a way to check to see if the subscription has been closed. (Whatever that way is, before it was on subscriber.closed, and I'm okay with that):

new Observable((next, error, complete, isClosed) => {
   let i = 0;
   while (!isClosed()) {
     next(i++);
   }
});

// or

new Observable(subscriber => {
  let i = 0;
  while (!subscriber.closed) {
    subscriber.next(i++); 
  }
});

(I actually prefer the latter of the two, honestly, but it might not be "javascripty" enough for the committee)

Another thing is cancellation tokens would be a nicer fit for other parts of javascript, like promise-based APIs. Cancellation tokens also provide a way of cancelling the forEach means of subscription, which exists for compat with async/await

acutmore commented 4 years ago
new Observable((next, error, complete, isClosed) => {
   let i = 0;
   while (!isClosed()) {
     next(i++);
   }
});

~Could never work. All false values are immutable.~

Edit: I am wrong. It’s a function not a boolean

mAAdhaTTah commented 4 years ago

@acutmore It's a function, not a boolean.

acutmore commented 4 years ago

@mAAdhaTTah whoops! Completely missed that somehow. Thanks for correcting me 🙏

acutmore commented 4 years ago

Adding another alternative that avoids currently non-standard Disposable and AbortSignal, but doesn't preclude extending Observable to include passing in a signal parameter in the future if they are added to the language.

Should using new Observable for the firehose case be considered a 'bug', as IMO the primary use case is for async push. Instead a static Observable.from(iterable) would be how to author that code.

Another observation is that events that trigger 'disposal'/'cancellation' can be split into two categories. Events that happen outside of the observable, and events that originate from the observable. There is no need to dispose on 'error', or 'complete' as the source has already stopped, only 'next' needs access to the disposable. These two categories can have two different ways of accessing dispose. The 'next' callback is passed dispose as an additional argument. External events can use the function returned from subscribe.

Con: forEach would not be disposable from an external event.

interface Iterable<T> {
    [Symbol.iterator](): Iterator<T>;
}

interface Observable<T> {
  static from<T>(iterable: Iterable<T>): Observable<T>;

  new (
    initialization: (
      nextHandler: (value: T) => void,
      errorHandler: (err: any) => void,
      completeHandler: () => void
     ) => () => void
  ): Observable<T>

   subscribe(
      nextHandler?: (value: T, dispose: () => void) => void,
      errorHandler?: (err: any) => void,
      completeHandler?: () => void, 
  ): () => void;
}
const fireHose = Observable.from(function* () {
  for (let i = 0; i < 1000000000; i++) {
    yield i;
  }
}());

fireHose.subscribe((v, dispose) => {
  console.log(v);
  if (v === 3) {
    dispose();
  }
});
const interval = new Observable(next => {
 const id = setInterval(next, 1000);
 return () => clearInterval(id); 
});

const dispose = interval.subscribe(v => {
  console.log(v);
});
setTimeout(dispose, 3 * 1000);

If AbortSignal was added to the language:

interface Iterable<T> {
    [Symbol.iterator](): Iterator<T>;
}

interface Observable<T> {
  static from<T>(iterable: Iterable<T>): Observable<T>;

  new (
    initialization: (
      nextHandler: (value: T) => void,
      errorHandler: (err: any) => void,
      completeHandler: () => void,
      signal: AbortSignal
     ) => () => void
  ): Observable<T>

   subscribe(
      nextHandler?: (value: T, dispose: () => void) => void,
      errorHandler?: (err: any) => void,
      completeHandler?: () => void,
      signal?: AbortSignal
  ): () => void;
}
runarberg commented 4 years ago

A case for keeping the Observer and Subscriber objects—for passing into the constructor and the subscribe method respectively—is for future extensions. Say we want to add a finally handler in the subscription that will fire on any value (error or otherwise; similar to Promise.prototype.finally) we are kind of out of luck if we stick with positional arguments.

runarberg commented 4 years ago

I decided to take this proposal for a test so I created a bunch of operators based on the simplified API. These include race, chunks, and even catchError.

https://github.com/runarberg/tc39-observable-proposal/tree/operators/src/extras

Overall I must say it was quite pleasant working with this API, I found it beneficial not having to have manage individual subscriptions in combination operators (such as flatMap).

Some minor downsides include creating a controller hierarchy to clean up upstream in take and having to have to manage a set of controllers in race. But overall I found the benefits outweigh the downsides.

I do think this proposal can do without some of the included methods (Observer.prototype.{first,last,forEach}) as they can all easily live in userland libraries (particularly Observer.prototype.last). Maybe keep forEach since most collections come with a forEach defined.

cedmandocdoc commented 4 years ago

@benlesh I think AbortController and AbortSignal is just an Observable with the context of cancellation. In code form, it looks like this:

const cancellationObservable = new Observable(next => next('CANCEL'));

This simply shows that for an Observable to know when to cancel, it should listen to another Observable that emits a cancellation. For instance, take a look this pseudo-code:

interval(1000).subscribe(
  value => console.log(value),
  error => console.log(error),
  () => console.log('done'),
  fromEvent(button, 'click').mapTo('CANCEL') // cancellation observable
)

The subscription function accepts an Observable that emits a context of cancellation and then it passed it to interval Observable to listen for that cancellation.

This idea suggests that a reactive program requires a reactive Observable. It also shows, that cancellation has never been a fundamental property of an Observable but an emergent property of some programs. Simply because there could be other types of data that an Observable can react with. For instance, a timer Observable, it emits data in the form of time but also listens to an external entity when to propagate or when to pause or when to completely stop. Pausable Observable will be easily achieved using this pattern. Another type of Observable I can think of is Pull Observable, with this pattern a producer can listen for a request when to emit data.

For more information, I've written an article that attempts to redefine Observable at its core. It has examples for synchronous propagation and a speculated specification of Observable. Check out the link below. https://github.com/cedmandocdoc/redefining-observable

acutmore commented 4 years ago

@cedmandocdoc That API becomes less clean when you want to 'cancel' the observable based on the values from 'next'. e.g. I want to take the first 10 values only.

matthewwithanm commented 4 years ago

@cedmandocdoc Have you seen the rxjs takeUntil operator? It allows you to use another observable basically the same way as an AbortSignal. With a replay subject you can basically get the same result (an imperative cancelation API via subject.next()) and it's pretty ergonomic!

I think @benlesh's proposal here is more about simplifying the core Observable API by aligning with the de facto standard cancelation mechanism (AbortSignal).

cedmandocdoc commented 4 years ago

@acutmore The idea of cancellation being an Observable is supported by the definition of an Observable itself, that is, basically an object that emits values. Cancellation is an action that comes from an entity and represents data in the context of canceling something. Whoever the actor is or even where or when the action takes place doesn't matter, if it represents a cancellation that is fine.

With all that in mind, we can implement different forms of cancellation. To list a few we have:

class Teardown extends Observable {
  constructor(producer) {
    super(producer);
    this.run = () => {};
  }

  subscribe(next, observable) {
    this.run = () => next('CANCEL');
    this.producer(next, observable);
  }
}

const teardown = new Teardown(() => {}); // pass an empty producer

fromArray([1, 2, 3]).subscribe(
  value => value === 2 && teardown.run() // fire a cancellation from the observer
  teardown // pass the Teardown Observable
);

interval(100).subscribe( value => value === 2 && subject.next('CANCEL') // fire a cancellation subject // pass the subject );


All examples show that cancellation could come from anywhere and could take place anytime. This could prove that cancellation is indeed just an Observable with the context of canceling something.
cedmandocdoc commented 4 years ago

@matthewwithanm The operator takeUntil shows that cancellation takes place when an Observable emits something. I think this commend the idea of cancellation is just another form of Observable.

And as you have said it is pretty ergonomic and yes I agree with that.

But I think the difference from the pattern I've showed compared to takeUntil operator is implementation. As far as I know the operator takeUntil relies on the returned function of the producer to cancel things but with the pattern I've showed it cancel the Observable through an Observable (with a specific context) by default.


I think @benlesh's proposal here is more about simplifying the core Observable API by aligning with the de facto standard cancelation mechanism (AbortSignal).

What do you mean by the de facto standard cancelation mechanism? Does this mean for the whole Javascript API? For example setInterval:

const controller = new AbortController();
const callback = () => {
  // logic
  controller.abort();
};
setInterval(callback, 1000, controller.signal)

I'm not sure about that, but if Javascript will embrace AbortController as a standard to cancel things for all the callback base or async API, I would say there will be contradictions. Different types of async API have different forms of cancellations. For instance if we abort a fetch request it resolves to an error this contradicts to others like setTimeout which doesn't have an error callback.

But from the pattern I've showed we can generalized those inconsistencies. For example we can create an AbortObservable that truly aborts an ObservableFetch.

class AbortObservable extends Observable {
  constructor(producer) {
    super(producer);
    this.run = () => {};
  }

  subscribe(next, observable) {
    this.run => () => next('ABORT'); // this emission will be observed by an ObservableFetch which then aborts the request to resolve to an error
    this.producer(next, observable);
  }
}

const abort = new AbortObservable(() => {}); // pass an empty producer

fromFetch(options)
  .listen(
    value => console.log(value),
    abort
  );

abort.run(); // abort the fetch

This not just limited to just abort fetch you can pass a merged Observable and merges an AbortObservable or plain CancelObservable where just cancel an Observable.

merge(AbortObservable, CancelObservable)

I think the idea of cancellation being an Observable is more simple than the use of AbortController and AbortSignal. Because of the ability to generalized cancellation. Lastly, I think it is more primitive than AbortController because you can create an AbortController-like using an Observable. It just all depends on how Observable should communicate with each other.

adamf92 commented 3 years ago

@benlesh Are you planning that changes in some future RxJS release ? From my perspective, one of the most important features in current Subscription implementation is abitity to create and manage aggregated/nested subscriptions - is it possible with signal/controller or will need additional implementation? I'm creating a framework (Atomi-Q, I think it could be interesting for you, as it demonstrates the potential of RxJS as a base for the fastest DOM updating solution) with concept called "Reactive Virtual DOM" - it's a tree of objects like Virtual DOM, but every dynamic/changeable node is Observable - thanks to it every state change is causing updates only in connected nodes - it's many times faster than Virtual DOM diffing. So, when elements are created, I can easily add their subscriptions to the parent subscription with add() method and when some element is removed, unsubscribing its subscription, automatically unsubscribes all children subscriptions, so it works great with my concept. From examples I suppose that passing same signal to multiple subscribe calls will allow unsubscribing multiple streams at once, but it will be nice to have it working same way as currently in RxJS

dy commented 2 years ago

Sorry for reiterating on the subject - can someone please explain (again) or give ref to:

  1. Why cancellation cannot be left up to userland libs to implement (like RxJS) and need to be part of spec? Promises don't have cancellation mechanism, and there's cancelable-promise (and other mechanisms) for that. Afaik some committee members are not much fond of AbortController.
  2. What's the point of forEach method here? Isn't that the same as subscribe(next)?

Also a point to add on leaking listeners. Idk was FinalizationRegistry a thing for the time of discussion, but as tested in sube - it's possible to subscribe leak-free without necessary unsubscription.

Feels like cramming too much stuff into spec is the main reason it's stalling. Just want to ref #210 as minimal meaningful proposal.

benjamingr commented 2 years ago

Why cancellation cannot be left up to userland libs to implement (like RxJS) and need to be part of spec? Promises don't have cancellation mechanism, and there's cancelable-promise (and others) for that.

Cleanup is fundamental to observables and without cancellation their expressiveness has significantly diminished. Though node and browsers (and others) settled on AbortSignal and that Chrome is blocking any non AbortSignal primitive suggestions in TC39.

What's the point of forEach method here? Isn't that the same as subscribe(next)?

You can await it which helps ergonomically.

dy commented 2 years ago

Cleanup is fundamental to observables and without cancellation their expressiveness has significantly diminished.

Maybe I haven't met use-cases for that yet. It's just for UI purposes (subscribable-things, templize, observable-value/value-ref) it's not apparent where that's useful - things get unsubscribed when observable is garbage-collected. @zenparsing also mentioned that as footgun. Do you possibly have more elaborate example where it is a must?

You can await forEach it which helps ergonomically.

Could you elaborate? Just want to understand where it helps and why it's unavoidable for spec. Recently I witnessed the opposite example: node collections can be detected by presence of forEach method and iterated over immediately (HTMLCollection, NodeList, Array), whereas Observable is expected to be subscribed to, but pretends to be a collection.

benjamingr commented 2 years ago

things get unsubscribed when observable is garbage-collected

I don't think that's an approach any committee would like and I think deterministic cleanup is important.

Could you elaborate?

Sure, 90% of the time when writing code that uses both observables and promises I have something like:

async function doFoo() {
  // setup page
  await observable.forEach(() => {
    // update page
  });
  // do more work
}

Though I guess this can be by implementing Symbol.asyncIterator (which is probably more spec work?)


I'd like to emphasize the observable proposal isn't actually blocked on either of those things though. It's not that cleanup or forEach went to the committee and the proposal got blocked on them - the proposal just doesn't have an active champion and hasn't been presented to tc39 or actively worked on in a long while.