WICG / observable

Observable API proposal
https://wicg.github.io/observable/
Other
546 stars 12 forks source link

Adds the catch operator to the README #87

Closed benlesh closed 1 month ago

benlesh commented 7 months ago

Makes progress on https://github.com/WICG/observable/issues/64. Simply adds the catch operator to the README. This is one of the most widely used operators. It's particularly useful in preventing inner subscriptions from terminating outer subscriptions in cases like long-polling or events that kick off other event streams.

For example, let's say someone creates an observable from a web socket:

function socket(url) {
  return new Observable(subscriber => {
    const ws = new WebSocket(url);

    subscriber.addTeardown(() => {
      if (ws.readyState <= WebSocket.OPEN) {
        ws.close();
      }
    });

    ws.onopen = (e) => {
      subscriber.next(e);
    };

    ws.onmessage = (e) => {
      subscriber.next(e);
    };

    ws.onerror = (e) => {
      subscriber.error(e);
    };

    ws.onclose = (e) => {
      if (e.wasClean) {
        subscriber.complete();
      } else {
        subscriber.error(e);
      }
    };
  })
}

const socketStream = socket('wss://someendpoint')

socketStream
  .do(e => {
    if (e.type === 'open') {
      const socket = e.target;
      socket.send('start');
    }
  })
  .filter(e => e.type === 'message')
  .map(e => JSON.parse(e.data))
  .subscribe(console.log);

The above stream will terminate the socket under the following error conditions:

  1. The socket closes "dirty", often due to losing network.
  2. The socket experiences another error
  3. The message data fails JSON.parse.

Ideally, in the first two cases, we can reconnect the socket. To do this, we can use catch:

// A utility for timeout
function timeout(ms) {
  return new Observable(subscriber => {
    const id = setTimeout(() => {
      subscriber.next();
      subscrber.complete();
    }, ms);
    subscriber.addTeardown(() => clearTimeout(id));
  });
}

// This is now a socket that will retry if it fails.
const socketStream = socket('wss://someendpoint')
  .catch(err => {
    // log the error
    console.error(err);

    if (err.type === 'close' && !err.wasClean) {
      if (!navigator.onLine) {
        // Wait until we're online and reconnect
        return window.on('online')
          .take(1)
          .flatMap(() => socketStream)
      }
    }
    // Wait for 1 second and try again
    return timeout(1000)
      .flatMap(() => socketStream);
  });

socketStream
  .do(e => {
    if (e.type === 'open') {
      const socket = e.target;
      socket.send('start');
    }
  })
  .filter(e => e.type === 'message')
  .map(e => {
     // These sorts of errors are generally going to be
     // resolved the traditional way.
     try {
       return JSON.parse(e.data)
     } catch (err) {
       console.error(err);
       return { parseError: err }
     }
  })
  .subscribe(console.log);
domfarolino commented 6 months ago

This seems to add back all of the IDL to the README file. Maybe this means to instead just add a couple lines to the README, and a few more to the spec?