WICG / observable

Observable API proposal
https://wicg.github.io/observable/
Other
546 stars 12 forks source link

Should the platform encourage stream-style handling of events? #56

Open littledan opened 10 months ago

littledan commented 10 months ago

I'm all for making event handling more ergonomic--it makes complete sense to me to have an object representing events from a target, that can be subscribed to multiple times, and disposed.

At the same time, I'm not sure if treating these events as a stream, with the iterator helper methods present to manipulate them, is the best model to encourage. I like the parallelism with existing APIs, but this proposal might be a bit too much. What if this API omitted the iterator helper APIs, and instead focused on subscribe, takeUntil, finally, and AbortSignal integration, to avoid this potential confusion?

At Bloomberg, when we've tried to use this programming style through Observable libraries, we've had trouble understanding and maintaining the resulting code (and this was not in the context of Angular). The concern is less about whether it can be used well in some contexts, and more about whether it is something to encourage broadly.

Iterator helper-like methods on Observables encourage processing events in a pipeline of calculations, as opposed to writing the function for each event out more explicitly, as addEventListener currently does. This pipeline is sometimes used to implement aspects of reactive UI. If there are patterns popular with RxJS today which you are not encouraging, it could be useful to explain this and guide people towards patterns which you do want to encourage.

Most people who I've discussed this proposal with had similar feelings, amounting to something like [not quoting anyone in particular; it describes my feelings too so I may be biased]

Meh, we don't really like this programming model, but there isn't anything too horrible about this existing. Mostly, we've found usages of combinator libraries like this to be over-engineered and difficult to maintain, so we'll try to not use this too much.

When a new primitive is added to the web platform, it nudges everyone towards adopting that vocabulary and paradigm - and evaluating the higher-order consequences that arise is not intuitive. What might make us more confident is more experience using this new, proposed API. This could be done with some more experience using a JS implementation/polyfill, or even just some larger worked examples.

(Related issue: #41)

benlesh commented 9 months ago

I think the smaller set of methods is the real key. I've seen so much RxJS over the years that was bad because people felt they needed to use the available operators in RxJS to do everything.

Things I see that immediately lead me to question the quality of someone's code: Using groupBy, toArray, or mergeMap. Use of any of the more "exotic" operators, windowing operators, forkJoin, et al.

Funny enough toArray() is in this proposal because it's mirroring the async iterator helpers, but honestly, even with async iterators. But in my experience it's often a smell.

Ultimately, making it safer and easier to create complex coordinations of events and other things that might require teardown is a big win. Async code is inherently not easy. Managing async code and resources with different shaped teardowns and manual management is footgun central, RxJS or not.

let counter = 0;
input.addEventListener('input', async () => {
  // We need to make sure we only process
  // the lookahead we care about
  const id = ++counter;

  await sleep(1000);
  if (counter !== id) return; // EVERYONE FORGETS THIS PART

  const req = await fetch('/somelookup?q=' + input.value);
  if (counter !== id) return; // EVERYONE FORGETS THIS PART

  const data = await req.json();
  if (counter !== id) return; // EVERYONE FORGETS THIS PART

  updateLookaheadList(data);
})

vs (hypothetically)

input.on('input')
  .debounce(1000)
  .switchMap(() => fetch(`/somelookup?q=${input.value}`))
  .switchMap((req) => req.json())
  .forEach(updateLookaheadList)
littledan commented 9 months ago

Yeah, I definitely see the ability to define a "debounce" operator which operates on a logical stream of events to be a benefit of this proposal. A couple tangents based on the details of your example:

benlesh commented 9 months ago

Completely unrelated to this issue, but related to your comment about AbortSignal and fetch...

fetch has its own issues WRT cancellation. The way it's wired in there doesn't compose well. Cancellable promises would have been better for that, TBH. If the promise returned by fetch was "disposable" or "abortable" or whatever, it would be easier for anything like switchMap to compose that behavior in. (Imagine a promise that was also implementing Symbol.dispose that would teardown something related and reject the promise with an abort error)

RxJS has fromFetch to solve a lot of the issues with composing cancellation from that API. The lack of a uniform API for promise-returning functions that accept an abort signal is a little problematic. IMO. (which is one of the things Observable solves)

artalar commented 9 months ago

Hi!

I totally agree with the topic! The only way we could accept this whole new paradigm on the platform is if it is integrated well. I am sure that people will not be satisfied with the current built-in methods and will continue to import libraries with prototype patches (here we go again)! The exact set of methods is a sensitive topic, and I don't think there can be a good design. The only reasonable option is to reuse existing methods from other APIs, which is why iterator-helpers should be investigated and implemented first.

That was my objective opinion as an experienced developer and team lead. Now, I want to share my personal opinion as a library author.

Don't get me wrong, but for me, streams are a very specific and archaic programming model that can only handle a strict pipeline of operations well. When we have conditions in the pipeline, things get a lot more complicated.

One of the most complicated things about streams is that you can't put a debugger point on a line of the source code and see all the related variables. In a regular async function, you can easily inspect all the variables in the closures, but stream debugging is much more complicated and not user-friendly.

I have been searching for ways to solve this for many years, and at a certain point, I realized that we don't need additional decorators at all. We can use regular functions and native async/await if we accept cancellation context for all our computations. This problem is perfectly solved with AsyncContext. The code looks much simpler but remains more flexible for inspections and refactoring.

Here is an example with the upcoming proposal.

// Observables
input
  .on("input")
  .debounce(1000)
  .switchMap((promiseOptions /* ??? */) =>
    fetch(`/somelookup?q=${input.value}`, { signal: promiseOptions.signal })
  )
  .switchMap((response) => response.json())
  .forEach(updateLookaheadList);

// AsyncContext
input.oninput = concurrent(async () => {
  await sleep(1000); // "lodash" or whatever you want.
  const response = await bind(
    fetch(`/somelookup?q=${input.value}`, { signal: asyncAbort.get().signal })
  );
  updateLookaheadList(await bind(response.json()));
});

Utils realizations is pretty simple.

let controller = new AbortController();
const asyncAbort = new AsyncContext.Variable(controller);
function concurrent(cb) {
  controller.abort();
  return asyncAbort.run((controller = new AbortController()), cb);
}
async function bind(promise) {
  const result = await promise;
  asyncAbort.get().throwIfAborted();
  return result;
}

In the code above, the async option makes it much easier to inspect and debug. It's also easier to add conditions to the logic without using additional operators. You can use regular if, switch, custom pattern matching, or whatever you want. You only need to care the async/Promise interface, which is already native.

You could add sampling (takeUntil) easily.

input.oninput = concurrent(async () => {
  await promisifyEvent(input, 'blur')
  const response = await bind(
    fetch(`/somelookup?q=${input.value}`, { signal: asyncAbort.get().signal })
  );
  updateLookaheadList(await bind(response.json()));
});
function promisifyEvent(target, type) {
  const { promise, resolve, reject } = Promise.withResolvers();
  const unsubscribeEvent = onEvent(target, type, resolve);
  const unsubscribeAbort = onEvent(asyncAbort.get().signal, "abort", reject)
  return promise.finally(() => {
    unsubscribeEvent();
    unsubscribeAbort();
  });
}
function onEvent(target, type, cb) {
  target.addEventListener(type, cb);
  return () => target.removeEventListener(type, cb);
}

But what about reactivity? In the previous issue (#41), I mentioned that it is a challenging topic with numerous edge cases, and I believe it would be preferable to depend on the expertise of library authors.

Here is an example with my own library, which uses the explicit ctx as the first argument in all methods instead of the implicit AsyncContext.

The original example.

const socket = new WebSocket('wss://example.com');

function multiplex({ startMsg, stopMsg, match }) {
  if (socket.readyState !== WebSocket.OPEN) {
    return socket
      .on('open')
      .flatMap(() => multiplex({ startMsg, stopMsg, match }));
  } else {
    socket.send(JSON.stringify(startMsg));
    return socket
      .on('message')
      .filter(match)
      .takeUntil(socket.on('close'))
      .takeUntil(socket.on('error'))
      .map((e) => JSON.parse(e.data))
      .finally(() => {
        socket.send(JSON.stringify(stopMsg));
      });
  }
}

function streamStock(ticker) {
  return multiplex({
    startMsg: { ticker, type: 'sub' },
    stopMsg: { ticker, type: 'unsub' },
    match: (data) => data.ticker === ticker,
  });
}

const googTrades = streamStock('GOOG');
const googController = new AbortController();
const googSubscription = googTrades.subscribe({next: updateView, signal: googController.signal});

Reatom implementation (docs).

const socket = new WebSocket('wss://example.com')

const reatomStock = (ticker) => {
  const stockAtom = atom(null, `${ticker}StockAtom`)
  onConnect(stockAtom, async (ctx) => {
    if (socket.readyState !== WebSocket.OPEN) {
      await onEvent(ctx, socket, 'open')
    }
    socket.send(JSON.stringify({ ticker, type: 'sub' }))
    onEvent(ctx, socket, 'message', (event) => {
      if (event.data.ticker === ticker) stockAtom(ctx, JSON.parse(event.data))
    })
    onEvent(ctx, socket, 'close', () => ctx.controller.abort())
    onEvent(ctx, socket, 'error', () => ctx.controller.abort())
    onCtxAbort(ctx, () =>
      socket.send(JSON.stringify({ ticker, type: 'unsub' })),
    )
  })

  return stockAtom
}

const googStockAtom = reatomStock('GOOG')
ctx.subscribe(googStockAtom, updateView)

You could share googStockAtom anyware, subscribe to it, react to it lifecycle. The whole this code is 3.15 kB (gzip)!

I'm not advertising my library, I just want to show what options we have. Choosing the right primitive for such a huge platform as we have is an important decision. I don't think Observable will be profitable enough, in my opinion, it will only add additional mental load for newcomers.

I will repeat my original sentence: Could we describe the list of benefits, taxes, alternatives, and correlate them all together?

nin-jin commented 6 months ago
  input.on('input')
    .debounce(1000)
    .switchMap(() => fetch(`/somelookup?q=${input.value}`))
    .switchMap((req) => req.json())
    .forEach(updateLookaheadList)

This is absolutely incorrect code, divorced from real life, which can only be written by a junior. The correct code would look something like that:

  const value$ = input.on('input')
    .map( ()=> input.value )
    .distinctUntilChanged() // prevent refetch for same value
    .shareReplay()

  const detach$ = input.on('detach')

  const suggestLoadingCount$ = new ObservableState(0)
  const suggestLoading$ = suggestLoadingCount$
    .map( count => count > 0 )
    .distinctUntilChanged() // prevent rerender with same effect
    .shareReplay()

  const suggest$ = value$
    .takeUntil( ()=> detach$ ) // prevent side effects after detach
    .forEach( ()=> { ++ suggestLoadingCount$.value } // count concurrent fetches
    .debounce( 1000 )
    .switchMap( value => fetch( `/somelookup?q=${value}` ) )
    .switchMap( res => res.json() )
    .finally( ()=> { -- suggestLoadingCount$.value } ) // handle any task end
    .shareReplay()

suggestLoading$
    .forEach( renderLoading ) // inform user about loading

suggest$
    .forEach( renderLookaheadList )
    .catch( renderError ) // inform about issues

Manual control of data flows is an extremely difficult task. And I'm not sure that I wrote everything correctly. In particular, I did not complicate the code by canceling requests, which is also desirable. Automatic invariant-based data flow is a much simpler and more reliable abstraction:

  const result = atom( ()=> {
    sleep( 1000 ) // debounce, new run cancels previous with all subtasks including fetch
    return fetch( `/somelookup?q=${ input.value }` ) // refetch on value change
  } )
  const suggest = atom( ()=> result().json() ) // async functions converts to sync with SuspenseAPI

  const render = atom( ()=> {
    try {
      renderLookaheadList( suggest() ) // rerender on suggest deep change
    } catch( cause ) {
      if( Promise.like( cause ) ) renderLoading() // SuspenseAPI
      renderError( cause ) // fast inform about issues
    }
  } )

  input.on( 'attach', ()=> render.start() ) // start auto render on attach
  input.on( 'detach', ()=> render.stop() ) // cancel all tasks on detach

I worked with Angular and RxJS for 3 months, and fixed a lot of childhood illnesses in their architecture. Stop forcing this overcomplicated solution already, please think about something more productive.

benlesh commented 6 months ago

πŸ˜„ There's a lot of text up there. I just to point a few basic things out:

The async/await examples above do indeed look simpler, but they lack true cancellation. Cancellation simply can't be composed through promises.

The concurrent implementation is clever, but not intuitive (which is in the eye of the beholder, I suppose), and it doesn't cover consumer cancellation, wherein the consumer is no longer interested in the stream at all. The fact there isn't a try-catch anywhere in there looking for error?.name === 'AbortError' tells me that it's going to show a lot of unhandled errors in the console at a minimum. Promises must resolve or reject. They can't be cancelled in the middle of their job like an observable can. πŸ€·β€β™‚οΈ Even in the multiplex example, there's no cancellation provided from what I can tell, where the observable version would be cleanly cancellable with an AbortSignal.

Also: The current proposal doesn't have debounce, nor does it have switchMap, although that's being discussed.

I guess I'm not sure what the goal is, and maybe you need to state it more explicitly? Are you requesting that the proposal be stopped? Are you proposing that atoms be proposed instead? Are you just trying to highlight your library as an alternative?

benlesh commented 6 months ago

And just to be clear, I know that I'm the one who mentioned debounce and switchMap above. But the reality is we're probably not going to get both of those. switchMap might be more realistic, but we'll see.

domfarolino commented 3 weeks ago

What if this API omitted the iterator helper APIs, and instead focused on subscribe, takeUntil, finally, and AbortSignal integration, to avoid this potential confusion?

Certainly people aren't literally confusing Observables with things like Arrays, right? I think there's probably some good discussion to be had regarding the complexity or debuggability of code using Observables, but I'm not sure I understand where "confusion" would come in.


Now on to the complexity and debuggability issues with reactive-primitives like Observables...

At Bloomberg, when we've tried to use this programming style through Observable libraries, we've had trouble understanding and maintaining the resulting code (and this was not in the context of Angular). [...] Most people who I've discussed this proposal with had similar feelings, amounting to something like [not quoting anyone in particular; it describes my feelings too so I may be biased]

Meh, we don't really like this programming model, but there isn't anything too horrible about this existing. Mostly, we've found usages of combinator libraries like this to be over-engineered and difficult to maintain, so we'll try to not use this too much.

I think this is very useful testimonial, and also something I've heard several times throughout the life of this proposal. I am having a little troubling figuring out what to make of it though. Perhaps it is because I'm not as much of a web developer these days, but I really want to understand very explicitly what the issues are here. Debuggability seems more obvious: if some deep operator has/throws an error, which is only surfaced far far away, in your distant error() callback, then I imagine figuring out exactly where in your pipeline of operators something went wrong is difficult if the error passed through doesn't have context on it. I would hope that enough context is passed through though!

Besides that, I would really like to try and understand the complexity issues people face. Specifically, I want to understand how code can just balloon in complexity when APIs of this kind are employed. I apologize if this sounds like a dumb ask, or sounds like me being a naive platform engineer in general.

Furthermore, it would be great to understand how representative this gripe is, or at least how to frame it in context. Given the surprising demand of this API by individual web developers, and the apparent popularity among library authors [1, 2, 3], I do have a little trouble figuring out how to weigh the few but recurring complaints of complexity that I have heard the Observable primitive promote when used in large codebases. Is most of the hype coming from people writing small hobby projects, and do most of the complaints / horror stories come from serious codebases that try and thread this primitive through everywhere? That seems too simplified, since I believe some of the userland Observable implementations are used by very large projects, inside very large companies, and the pattern has not been widely discredited.

But in any case, understanding the exact issues β€” i.e., specific patterns that lead to pain when using Observables, compared to patterns that lead to success β€” would help me understand if there are any specific patters that we should indeed not be encouraging, which is one of @littledan's original asks:

If there are patterns popular with RxJS today which you are not encouraging, it could be useful to explain this and guide people towards patterns which you do want to encourage.