WICG / observable

Observable API proposal
https://wicg.github.io/observable/
Other
569 stars 13 forks source link

Reconsider unpredictable hot/cold and async/sync behavior #170

Open esprehn opened 3 weeks ago

esprehn commented 3 weeks ago

The rxjs inspired API creates a situation where Observables sometimes have side effects upon subscription (hot vs cold) and sometimes emit synchronously and sometimes async. If you have a function that takes an Observable as an argument you have no predictability if it will have a side effect upon subscription (is it safe to call it twice?), and no predictability if the emit will happen immediately when subscribing or later in a microtask.

This is a manifestation of Releasing Zalgo: https://blog.izs.me/2013/08/designing-apis-for-asynchrony/ https://blog.ometer.com/2011/07/24/callbacks-synchronous-and-asynchronous/

Promises are the opposite of this:

This means if someone passes a Promise to you, all operations on it are predictable and safe.

With Observables the situation is different:

In large systems built on Observables I see this manifest often as:

Other Web APIs are also consistent today (ex. MutationObserver, ResizeObserver, Promises), and never emit during subscription and have a split between observing the operation and executing it. The one place I'm aware where this was violated is MessagePort#onmessage starting the port, but it's generally been considered a mistake.

Context: I lead possibly the largest codebase built on rxjs in the world.

PowerKiKi commented 3 weeks ago

I share similar concerns, albeit on much smaller projects. Async vs sync can be annoying, but I feel hot vs cold is even more important.

I don't know what kind of API could solve those annoyances, but if there's a reasonable solution, it would be awesome to standardize on that instead.

I'd like to hear the thoughts of @benlesh about this, as I am sure that must have come up in the years of maintaining rxjs.

domfarolino commented 3 weeks ago

Thanks so much for filing this. Thoughts below.

Async vs sync callbacks

I understand and am sympathetic to the async vs sync concern here, but I'm not sure how to handle it without making things more complicated. From reading the supplied articles, it seems to me that the concern is not so much that subscriber.next() synchronously invokes the SubscriptionObserver#next() handler in the general case (this seems required if Observables are going to integrate with Events in any useful way). But the problem seems to be that it can do so before subscribe() actually returns. This is the synchronous firehose problem. Do I have the concern right?

I really think in the general case — long after subscribe() returns, for example — subscriber.next() needs to synchronously invoke the next handler, for Observables to be at all useful for events. So the only remedy I can imagine here is somehow deferring the effects of subscriber.next() iff subscribe() itself is on the stack. We could queue the invocation of the next() handler behind a microtask, as to avoid the sync firehose problem altogether. It's a little funny though, because then we end up in a situation where:

  1. With respect to subscribe(), Observables are "async": That is, no observer callback (like a next() handler) can run while subscribe() is on the stack. The API is effectively "async" in that it never behaves like list.forEach(callback); but...
  2. With respect to subscriber.next(), Observables can be async & sync: That is, most of the time when you call subscriber.next(), the relevant observer callback will run synchronously right then and there. Except if subscribe() is on the stack, then the effects of next() are deferred a tick

This seems to just move the "this API is both sync and async" concern from subscribe() ➡️ subscriber.next() (and kin). That's not great. I think this is an artifact of the fact that Observables are lazy and Promises are not. That is, for Observables, subscribe() actually does stuff, where for Promises, then() just listens to stuff that's already underway. I see two ways of fixing this:

  1. Never invoke observer callbacks synchronously. I think this is a dealbreaker for event integration.
  2. Make Observables eager; you run the "subscriber callback" upon Observable creation instead, but only once and for all. You don't get access to any values until you finally subscribe().
    • What do we do with values the Observable has encountered in between creation and subscription? We can't queue them for later replay (Promises do this kinda replay, but for a single value). If we did, we'd have to replay them synchronously to the observer during subscribe(), but that brings us back to the concern that observer/subscribe callbacks may or may not be fired synchronously during subscribe(). If we replayed the values asynchronously to the observer, then that brings us to the other problem above where sometimes your next() handler is fired synchronously with respect to subscriber.next(), and sometimes it isn't. But that's the same problem that led us to consider making Observables eager, so I don't think it actually solves anything.

I'd love some more thoughts on this. The simplest thing here is (1) above, but I think it's a dealbreaker for event integration. The other options I've thought of are just as tricky/complicated as the current state of affairs, but have the added downside of breaking the Observable contract that any users today expect, for unclear benefit.

Hot vs cold (side effects)

I don't think I understand this concern very much, but I could just be missing something big. I'm considering the following comments:

have no predictability if it will have a side effect upon subscription

With Observables the situation is different:

  • Calling subscribe might be expensive (API request) or cheap (replay)

Our proposal here has no "shared" state across subscriptions, so if any call to subscribe() has side effects, then all calls to subscribe() have side effects. So I don't think there's anything in our proposal that we can address regarding hot vs cold. All Observables feel hot, to me.

Maybe the concern is the user code inside the subscriber callback has some "shared" state, where it doesn't perform the side effects more than once (i.e., for the 2nd+ subscription to the same Observable). Is that the concern? If so, I feel like Promise-returning APIs have the same concern. You could imagine some API that (a) does something super expensive, and (b) vends a Promise representing the value. If that API is called many times, you might get a Promise that just replays the cached shared value, as to not re-kick-off the expensive operation. That "tricky"/optimized user code is the same as the user code that would be in the subscriber callback, that's deciding to share/replay some state across many subscriptions to the same Observable.

benlesh commented 2 weeks ago

It's funny, I actually spoke directly to @isaacs (the author of the Zalgo blog article) about this a long time ago (probably 2015) when I ran into him at a birthday party for a mutual friend in the Bay Area. "Zalgo" can be problematic for sure, but it's really not appropriate to worry about that for a reactive programming type like this. Observables are fairly low level.

More importantly though, this needs to work for EventTarget, and if it forces asynchrony (like a promise does) it doesn't work because of things like preventDefault() or stopPropagation():

form.when('submit').subscribe(e => {
  // If this is always async, it's too late.
  e.preventDefault();
});

element.when('click').subscribe(e => {
  // If this is always async, it's too late.
  e.preventDefault();
})

Further, EventTarget is capable of wiring up an event handler, dispatching it, then removing the event handler synchronously, even now. And we need to be able to model that:

const handler = () => console.log('event handled!');

console.log('adding listener');
eventTarget.addEventListener('test', handler);
console.log('dispatching');
eventTarget.dispatchEvent(new CustomEvent('test'));
console.log('removing listener');
eventTarget.removeEventListener('test', handler);

// logs
"adding listener"
"dispatching"
"event handled!"
"removing listener"

Where this should do the same:

const ac = new AbortController();
console.log('adding listener');
eventTarget.when('test').subscribe(
  () => console.log('event handled!'), 
  { signal: ac.signal },
);
console.log('dispatching');
eventTarget.dispatchEvent(new CustomEvent('test'));
console.log('removing listener');
ac.abort();
benlesh commented 2 weeks ago

On the front of "hot" and "cold": Really what you're talking about is forced multicast. That comes with complications and overhead.

EventTarget#when will always be "hot" and force multicast. That's because it doesn't create the source of the events, the EventTarget, because it existed prior to the creation of the observable.

It's plausible to make everything "hot" or "multicast" by default... however here are the complications:

  1. Every observable would need to do reference counting for its subscribers.
  2. What is the behavior at first subscription? Does the observable start then? Was it already started?
  3. On subscription 2+, if those subscribers are late, do we give them all of the previous values? The most recent value? or just make them wait for values?
  4. If subscription must be allowed to emit asynchronously (see my previous comment), then if multiple subscribers subscribe to the same observable, even synchronously in a loop, we'll have to be able to send all previous values to the new subscribers, because the second subscribers would have missed them.
  5. When the observable is complete or errored does it need to be recreated? Or does it "reset" to its original state?
  6. When all subscribers abort or otherwise end their subscriptions, do we keep the source running? Do we abort it? Does it retain its previous values? Does it revert to its original state?

On the other hand, the "cold" observable, which is the most basic, low-level design for observable, is almost a fancy function with some guarantees. I can be used to compose any of the behaviors above quite easily.

Another thought: Most of the desire around forced multicast for observables generally comes from people wanting to use observables for state management. While observables can be used for state management, really, they're best at composing events. Something like Signals are better used for state management.

esprehn commented 2 weeks ago

@domfarolino

I don't understand what you mean by With respect to subscribe(), Observables are "async":, the subscribe callback can emit synchronously (the firehose situation). That's what I'm pushing back on, the rest of the platform doesn't work that way and it has the Zalgo issue.

If someone passes a Promise into a function there's nothing I can do to that Promise to end up back in sync land. Bluebird did actually have that behavior back in the day and it was quite painful and thankfully native Promises didn't carry it over.

An example fix for this would be for the the subscribe callback to emit on microtask, which means even if the subscription is created synchronously the callback is always async which avoids Zalgo.

@benlesh Events emitted by the browser will always have a microtask checkpoint before going back into native code, which means even if there's a microtask resolve you could preventDefault(), so your examples do work.

const a = document.body.appendChild(document.createElement('a'))
a.textContent = "Click me!";
a.href = "https://www.google.com/";
a.onclick = (e) => {
  Promise.resolve().then(() => e.preventDefault());
};

The situation that doesn't work is manually dispatched events (calling dispatchEvent/click from inside JS). That's a platform issue where there's no way to manually trigger a microtask checkpoint and APIs behave differently when done from the JS vs native code. The fix for that is to either expose manual checkpoints or add a EventTarget#dispatchEventAsync(): Promise<boolean> API which queues a native task so there's a checkpoint before reading the result of the dispatch.

domfarolino commented 2 weeks ago

I don't understand what you mean by With respect to subscribe(), Observables are "async":

Please see the sentence immediately preceding the one you quoted:

We could queue the invocation of the next() handler behind a microtask, as to avoid the sync firehose problem altogether. It's a little funny though, because then we end up in a situation where [...]

In other words, I'm describing (with the sentence "With respect to subscribe(), Observables are "async"") what would be the outcome if we "queued the invocation of the next() handler behind a microtask, as to avoid the sync firehose problem altogether".

benlesh commented 2 weeks ago

Events emitted by the browser will always have a microtask checkpoint before going back into native code

TIL, @esprehn, thanks! That's good news. However there are still issues with scheduling for every emitted value.

  1. Unlike promises, which will only have one value to worry about, an observable can have any number of values. That means we'd have to hold onto every value that was scheduled to be emitted on that microtask until the microtask fired. This is effectively an unbounded buffer. Not a big deal, but not great as the observables are used across an app.
  2. It actually exacerbates the "sync firehose problem" by obscuring it from the developer: In the case of Observable.from(iterator), where the iterator is perhaps infinite, the user will be unable to unsubscribe from the iterator based off of what's been emitted, as the iterator will have to complete its synchronous iteration before the microtask to emit the values fires.
  3. Fundamentally, observable is the "dual" of iterable, and a primitive. The synchronous emission is completely by design and required. Problem 2 above is a symptom of us trying to deviate from the principles there. In more than a decade of this type, the lack of async emission has hardly ever been the issue. Usually, issues with observable arise from the fact that asynchrony is hard to reason about, and people tended to overuse the 100+ RxJS operators.

An example for problem number 2 above:

function* allIntegers() {
  let i = 0;
  while (true) yield i++;
}

const ac = new AbortController();

Observable.from(allIntegers()).subscribe(n => {
  console.log(n);
  if (n === 10) {
    ac.abort();
  }
}, { signal: ac.signal });

Which seems like nonsense, sure, it's just taken to the extreme... but someone could do something with it that runs through a potentially large set of data, but you only really want to get the first few that match something then run it.

window.when('message')
  .switchMap(e => {
    return Observable.from(e.data.rows)
      .filter(row => row.shouldStream)
      .take(3)
      .flatMap(row => streamRowData(row))
  })
  .subscribe(updateView, { signal })

Now... you could ABSOLUTELY just do the filtering and taking in the iteration part of this code. BUT observable is the dual of iterable... it should "just work" in roughly the same way no matter which type you choose. If observable schedules, however, it cannot.

The only other argument I've seen (and it's even an argument I've made myself) is that "observable should schedule because we don't get the cancellation mechanism back until after the subscribe call returns", which was (and is) problematic because the "synchronous firehose" can run before the consumer has the ability to cancel. RxJS has to do some dancing around that part of the design. But since with this design we're now only accepting the cancellation mechanism via AbortSignal as an argument to subscribe, that's no longer an issue.

And again... I'll go back to say that for more than a decade of very broad use, observable has not needed scheduled emissions. None of the implementations we cite in the README as examples of prior art add any additional scheduling.

I would expect to find new issues if we introduced this behavior, as it's a breaking change from what has been used for so long. Different unknowns and edge cases.

esprehn commented 2 weeks ago

In more than a decade of this type, the lack of async emission has hardly ever been the issue.

I disagree with this statement. It's not been my experience, which is why I filed this bug. I oversee a massive rxjs codebase maintained by thousands of engineers.

Taking a step back, I don't think "the sync firehose" makes sense on the web platform. Can you give a real example?

The unbounded buffer is blocking the thread which is not something you should be doing on the web platform since it prevents promises from resolving and the thread from being responsive to user input. If you need an actual unbounded firehose that sounds like you should use an async iterator inside the callback which would create microtasks interleaving consumption and emission creating those opportunities to unsubscribe.

Taking your example:

window.when('message')
  .switchMap(e => {
    return Observable.from(AsyncIterable.from(e.data.rows))
      .filter(row => row.shouldStream)
      .take(3)
      .flatMap(row => streamRowData(row))
  })
  .subscribe(updateView, { signal })

does exactly what you want.

See also: https://github.com/tc39/proposal-async-iterator-helpers

benlesh commented 1 week ago

Observable.from(AsyncIterable.from(e.data.rows))

This is a magic recipe though.

Forced asynchrony adds unnecessary complexity and overhead to the type. It introduces a lot of unknowns and questions and untested behaviors that will challenge the entire API, and I have yet to see why that would be beneficial beyond worries about "Zalgo" which aren't really appropriate for this type.

In a world with this brand new, never-been-tried forced async observable type, what happens here?

const source = new Observable((subscriber) => {
  let n = 0;
  while (subscriber.active) {
    subscriber.next(n++);
  }
});

source.take(10).subscribe(console.log);

In the untried new observable of forced async, it blocks the thread. With a synchronous observable, it does not.

And what happens here? Are the async task logs interleaved with the map calls and the subscribe log? Do we need to queue the value and schedule the processing of of every projected map value?

button.when('click')
   .map(() => {
     console.log('in map 1')

     queueMicrotask(() => {
       console.log('do something async');
     });

     return Date.now()
   })
   .map(() => {
     console.log('in map 2')

     queueMicrotask(() => {
       console.log('do something async');
     });

     return Date.now()
   })
   .subscribe(console.log)

There's just so many unknowns to this newly proposed type.

The scheduling allocations per turn alone would scale terribly, and I'm not even sure there's a mechanism for cancelling a micro task yet.

Even if we tried to be conservative about this change and said "Well, let's only schedule the subscription, not the emissions" so we prevent all of the overhead and other weird issues with cancellation, Then there's the question about subscriptions inside of methods. If you have source.map().filter().subscribe() The subscription to subscribe will schedule, then the subscription in filter() will schedule, then the subscription in map() will schedule, and finally, the subscription to source will schedule... So we've scheduled four times just to get the stream started.

Forcing asynchrony would be a huge mistake and will actually make things harder to reason about, not easier. And at the cost of additional memory footprint and processing time.

benlesh commented 1 week ago

I'm totally willing to die on this hill, honestly. Even the title of this thread unfairly characterizes the behavior of this very basic type as "unpredictable", when it's in fact completely predictable until you force it to schedule with other arbitrary microtasks.

benlesh commented 1 week ago

Thinking about this even more. If there was forced scheduling, testing observables becomes even more cumbersome.

it('should handle events via an observable', async () => {
  const target = new EventTarget();

  const results = [];

  target.when('test')
    .map(e => e.detail.value)
    .subscribe((value) => {
      results.push(value);
    });

  // wait for subscription to start.
  await Promise.resolve();

  target.dispatchEvent(new CustomEvent('test', { detail: 'hi' }));

  // wait for event to propagate
  await Promise.resolve();

  expect(results).toEqual(['hi']);
});

...which is even less ergonomic than addEventListener or really any other approach related to events, as far as testing goes.

Even if you use some harness to "click" a button, you'll have to wait a tick to know for sure the click happened. It's just more complicated.

benlesh commented 1 week ago

Now.. all of this said, if you really wanted to consume an observable with forced asynchrony, it implements Symbol.asyncIterator. So all you'd have to do is consume it in a for await loop. You could even have a simple lint rule that forced either that or await observable.forEach() to ensure that no one "released Zalgo", only in a way that doesn't force a complicated behavior on everyone.