WICG / observable

Observable API proposal
https://wicg.github.io/observable/
Other
563 stars 13 forks source link

Include `switchMap` operator? #52

Closed Yogu closed 3 months ago

Yogu commented 11 months ago

The explainer includes flatMap, but not switchMap. I think switchMap should be included because it is essential in a common pattern:

A simple example is a search bar that triggers a network request, but should cancel the previous one when the user types too quickly.

I don't think I have ever used the rxjs flatMap operator, but I'm using switchMap for the pattern describe above all the time.


From @benlesh: I created a duplicate issue (#90) for this accidentally, so I'll include the use cases and text I posted there here:

switchMap was mentioned at TPAC as being an API of interest. It's a method that is especially useful for composing events in the browser because of its inherent cancellation. If API bloat is a concern, I would gladly remove toArray(), some(), find() et al, as those are minimally useful with observables.

Use cases are things like:

Use Case: Lookahead Search

In the below scenario, the result of the fetch would simply be ignored.

It's plausible that we could provide a signal along with the switchMap callback, as well. In fact, I'd almost encourage that, as it might make fetch more useful. But I think we'd want to discuss the ergonomics there, and that could be debated and added later. (For example, we might want to have it automatically swallow DOMExceptions named AbortError?)

const input = document.getElementById('search');
const resultList = document.getElementById('result-list');

input.on('input')
  .switchMap(async (e) => {
    const response = await fetch(`/search?q={e.target.value}`);

    if (!response.ok) {
      console.warn(`Search response error: ${response.status} ${response.statusText}`
      return;
    }

    return response.json();
  })
  .subscribe((results) => {
    resultList.innerHTML = results
      .map(result => `<li><a href="${result.href}">${result.text}</a></li>`)
      .join('')
  })

Use Case: Changing Connections

Below, every time a URL is changed in an URL input, if it's a valid URL, it will disconnect from the previous web socket and connect to a new one.

function streamData(url) {
  return new Observable(subscriber => {
    const ws = new WebSocket(url);
    ws.onmessage = e => subscriber.next(e);
    subscriber.addTeardown(() => ws.close());
  })
}

const urlInput = document.getElementById('url-input');

urlInput.on('input')
  .filter(e => e.target.checkValidity())
  .switchMap(e => streamData(e.target.value))
  .subscribe(console.log);
bakkot commented 11 months ago

Yeah, the question of ordering is a little more interesting for observables than for iterators. I might honestly prefer to omit flatMap and instead only include concatMap and switchMap (or whatever other names), since flatMap has multiple valid interpretations for observables.

benlesh commented 11 months ago

In this implementation, flatMap is RxJS's concatMap, mostly because it intuitively follows the behavior of flatMap in other types. Although I need to check up on AsyncIterator helpers flatMap...

bakkot commented 11 months ago

The flatMap in async iterator helpers will be RxJS's concatMap.

tabatkins commented 11 months ago

Agreed, switchMap is one of the most important operators for doing a common (and very difficult by hand) ordering of multiple async operations.

(I think I disagree on dropping flatMap; concatMap is the most straightforward translation of Array.flatMap among the four RxJS fooMap operations: it turns each input item into a sequence of zero or more items in the output, in order. It's possible someone might intend a mergeMap instead, but I think that one is inherently more dangerous if you're not certain it's what you want.)

benlesh commented 11 months ago

For context, my advice to people when trying to figure out which flattening operation to choose in RxJS is basically: "default to concatMap unless you want implicit cancellation, then use switchMap"... mergeMap is used probably too often and the results can be non-deterministic, and exhaustMap has limited use cases. The main use case I've used it for that's web related has been touch events.

concatMap is definitely the safest of the four. switchMap is the most "wow, this is useful" in many scenarios.

If folks would like switchMap, I'm happy to add it and see what sticks. The classic scenario is a lookahead search:

input.on('input').switchMap(async () => {
  const res = await fetch(`/lookahead?q=${input.value}`);

  if (!res.ok) {
    console.log('bad lookahead')
  }

  return res.json();
})
.subscribe({
  next: data => {
    results.innerHTML = data
      .map(d => `<li><button type="button" tabIndex="0">${d}</button></li>`)
      .join('\n');
  }
})
MaximSagan commented 11 months ago

concatMap is definitely the safest of the four

Isn't concatMap the most likely cause of backpressure in a reactive application?

benlesh commented 11 months ago

Isn't concatMap the most likely cause of backpressure in a reactive application?

This is off topic, but just to share:

At worst, concatMap is processing elements of an array. No different than a for..of with an await in it. The only difference is that in theory the array could be unbounded. However in practice, it's very, very rarely unbounded. The values stop eventually and the system catches up. In the event that you're getting a stream of elements and populating a queue so fast that you can't possibly process them, your architecture is wrong, and the types you've chosen are incorrect. You'll need to switch to something like an AsyncIterable that's able to control how often you get updates from the source, or you'll need to switch to a strategy that is perhaps "lossy" to allow breathing room.

In almost a decade, I've never seen concatMap be the source of a back pressure issue.

The reason I say that concatMap is the "safest" choice for most things:

  1. It orders everything deterministically in an easy-to-reason-about way (unlike mergeMap).
  2. There's no inherent cancellation (like switchMap) that can cause issues that are hard for people to figure out.

Consider this:

document.on('click')
  .filter(e => e.target.matches('button.cancel-order'))
  .switchMap(e =>
    fetch('/order-cancel', {
      method: 'POST'
      body: e.target.dataset.id,
    })
    .then(resp => resp.json())|
  )
  .subscribe(deleted => {
     document.querySelector(`.order[data-id="${deleted.id}"]`).remove();
  })

With switchMap above, if the user clicks "cancel" on a few items faster than the /order-cancel endpoint can respond, the list will not be updated properly, because the next click will cancel the processing of the previous.

If you switch that with concatMap (flatMap in this proposal) it will be fine.

That said, switchMap is incredibly useful for switching between streams, or cancelling things you really do want to cancel (for example any sort of GET fetch is probably fine to interrupt and start over).

benlesh commented 8 months ago

From @bakkot here

Your first example kind of scares me as written - the right thing is to cancel the fetch, and I wouldn't want to add a convenience method which makes the wrong thing much easier than the right thing. So I would definitely want to explore avenues for making abort-on-unsubscribe patterns easier, if we were to add this.

One wild idea for that: the mapper (in this and potentially other callback-taking methods) could take as its third argument a { signal }, created by and internal to the operator, so that you could do

input.on('input')
  .switchMap(async (e, idx, { signal }) => {
    const response = await fetch(`/search?q={e.target.value}`, { signal });
    /* ... */
  })
  .subscribe((results) => { /* ... */ })

and have the signal automatically abort when the observable returned by the callback is unsubscribed. (Similar idea suggested at https://github.com/tc39/proposal-iterator-helpers/issues/162#issuecomment-968929171, though I think it's more relevant to switchMap than map etc because unsubscription of the inner thing can happen prior to unsubscription of the outer thing.)

benlesh commented 8 months ago

I wouldn't want to add a convenience method which makes the wrong thing much easier than the right thing.

@bakkot I understand your concern, however, given the design of fetch and AbortSignal, it's already much, much easier to do the wrong thing than the right thing in a lot of contexts. Consider the following approaches of doing this same sort of lookahead search:

Naive approach

This is the sort of thing most people will opt for today. In part because the dance around AbortSignal and AbortError is a little fiddly, and also in part because in many cases people just don't care if they eat the perf hit of not aborting the fetch.

let currentSearch = '';
searchInput.addEventListener('input', async (e) => {
  const searchText = e.target.value;
  if (searchText !== currentSearch) {
    currentSearch = searchText;
    const response = await fetch(`/search?q={searchText)`);
    if (currentSearch !== searchText) return;

    if (response.ok) {
      const results = await response.json();
      if (currentSearch !== searchText) return;
      resultsList.innerHTML = results
        .map(result => `<li><a href="${result.href}">${result.text}</a></li>`)
        .join('');
    }
  }
});

Raw AbortSignal Approach

This is what you'd have to do today to do things the "correct" way using AbortSignal.

let ac;
let currentSearch = '';
searchInput.addEventListener('input', async (e) => {
  const searchText = e.target.value;
  ac?.abort();
  if (searchText !== currentSearch) {
    currentSearch = searchText;
    ac = new AbortController();
    try {
      const response = await fetch(`/search?q={searchText)`, { signal: ac.signal });

      if (response.ok) {
        const results = await response.json();
        resultsList.innerHTML = results
          .map(result => `<li><a href="${result.href}">${result.text}</a></li>`)
          .join('');
      }
    } catch (error) {
      if (error?.name !== 'AbortError') {
        throw error;
      }
    }
  }
});

Observables With Naive switchMap

Here we're emulating the naive approach above, just with observables.

let currentSearch = '';
searchInput.on('input')
  .map(e => e.target.value)
  .filter(searchText => searchText !== currentSearch)
  .switchMap(async (searchText) => {
    currentSearch = searchText;
    const response = await fetch(`/search?q={searchText)`);

    if (response.ok) {
      return await response.json();
    }
  })
  .subscribe((results) => {
    if (results) {
      resultsList.innerHTML = results
        .map(result => `<li><a href="${result.href}">${result.text}</a></li>`)
        .join('');
    }
  })

Observable With "Abort Aware" switchMap

In this implementation, switchMap provides an argument that is an AbortSignal that would abort when the "switch" occurred. switchMap would also be aware that errors that are AbortErrors can be handled gracefully, for ergonomic reasons.

Notice it's barely different than the naive observable approach above.

let currentSearch = '';
searchInput.on('input')
  .map(e => e.target.value)
  .filter(searchText => searchText !== currentSearch)
  .switchMap(async (searchText, i, signal) => { //<-- 
    currentSearch = searchText;
    const response = await fetch(`/search?q={searchText)`, { signal }); // <--

    if (response.ok) {
      return await response.json();
    }
  })
  .subscribe((results) => {
    if (results) {
      resultsList.innerHTML = results
        .map(result => `<li><a href="${result.href}">${result.text}</a></li>`)
        .join('');
    }
  })
tabatkins commented 8 months ago

Yup, I was about to type up a similar (tho much less detailed) response - the lack of a switchMap makes an even more wrong implementation the easiest thing to do, with worse results for the user. Thinking about how we can make this pattern even better is very useful, but we shouldn't discard it just because it's not perfect. The pattern it's addressing will be written anyway, just wrongly.

bakkot commented 8 months ago

Edit: I wrote a thing here which I think was wrong given how flatMap actually works. I'll have to think more about the differences here.

bakkot commented 8 months ago

OK, so, what do you actually write if you have observables, but not switchMap, and are thinking about the fact that you need to switch? I wrote a thing with flatMap originally, which was wrong because flatMap apparently queues its inputs rather than its outputs (which I guess makes sense?). So instead, something like

let ac;

let currentSearch = '';
searchInput.on('input')
  .map(e => e.target.value)
  .filter(searchText => searchText !== currentSearch)
  .map(async (searchText) => {
    ac?.abort();
    ac = new AbortController();
    try {
      currentSearch = searchText;
      const response = await fetch(`/search?q={searchText)`, { signal: ac.signal });

      if (response.ok) {
        return await response.json();
      }
    } catch (e) {
      if (e.name !== 'AbortError') throw e;
    }
  })
  .flatMap(x => x)
  .filter(v => v !== void 0)
  .subscribe((results) => {
    if (results) {
      resultsList.innerHTML = results
        .map(result => `<li><a href="${result.href}">${result.text}</a></li>`)
        .join('');
    }
  })

?

benlesh commented 8 months ago

@bakkot you can always join map(fn).flatMap(x => x) to be just flatMap(fn).

(EDIT: Nevermind, you can't in this case because of the dance around getting cancellation to work to make it like switchMap! Wow. That got me for a second.. 😬 )

That said, yes to emulate switchMap you'd have to do something like what you did above, where you'd have to close over an AbortController instance within each subscription. Worse, you'd need to trick the cancellation into happening before you entered the flatMap body.

Then, if you're using an async function, as you did because it's ergonomic for fetch, you'd have to have that filter(v => v !== void 0), as you did, to avoid the empty results that would happen after the cancellation. Otherwise, since AsyncIterable converts cleanly to observable, you could use async function* to avoid the extra filter:

let ac;
let currentSearch = '';
searchInput.on('input')
  .map(e => e.target.value)
  .filter(searchText => {
    if (searchText !== currentSearch) {
      currentSearch = searchText;
      ac?.abort(); // This can't happen inside the `flatMap`!
      return true;
    }
    return false;
  })
  .flatMap(async function* (searchText) {
    ac = new AbortController();
    try {
      const response = await fetch(`/search?q=${searchText}`, { signal: ac.signal });

      if (response.ok) {
        yield await response.json();
      }
    } catch (error) {
      if (error.name !== 'AbortError') {
        throw error;
      }
    }
  })
  .subscribe((results) => {
    if (results) {
      resultsList.innerHTML = results
        .map(result => `<li><a href="${result.href}">${result.text}</a></li>`)
        .join('');
    }
  })
bakkot commented 8 months ago

Given the subtlety of getting the right behavior with flatMap (it makes me feel better that @benlesh also missed it on first glance) I am convinced that adding switchMap would be an improvement.

I still think it would be nice to have the automatic signal argument, though. Or something else in that vein to make it easier to cancel outstanding work.

domfarolino commented 4 months ago

I've filed https://github.com/WICG/observable/issues/134 to continue the more targeted discussion about the automatic signal argument for switchMap() since we've landed on adding the operator in general, both in this issue and in https://github.com/WICG/observable/issues/126.