rocket-connect / for-emit-of

Turn Node.js Events into Async Iterables.
https://www.npmjs.com/package/for-emit-of
MIT License
9 stars 2 forks source link

About events.on #25

Open Farenheith opened 2 years ago

Farenheith commented 2 years ago

I was working on a PR to node to improve readline async iterator performance (https://github.com/nodejs/node/pull/41276) and it ended up, after some reviews, changing the function events.on, that transforms an event emitting into an async iterator, almost like this library, but the native function doesn't support closing events to end the iterator.

The changes I did there, though, can bring an improvement up to 50% to the function performance, If the PR is accepted. So, as the new version of the function will do pretty much what this package does, I'm posting the code here if you want to adapt it onto your package. The main ideas behind were not mine, but preexisting in the node code. My main contribution was to use the FixedQueue, adding watermark control and support for close events. Here it is:

/**
 * Returns an `AsyncIterator` that iterates `event` events.
 * @param {EventEmitter} emitter
 * @param {string | symbol} event
 * @param {{
 *    signal: AbortSignal;
 *    close?: string[];
 *    highWatermark?: number,
 *    lowWatermark?: number
 *   }} [options]
 * @returns {AsyncIterator}
 */
function on(emitter, event, options = {}) {
  // Parameters validation
  const signal = options.signal;
  validateAbortSignal(signal, 'options.signal');
  if (signal?.aborted)
    throw new AbortError(undefined, { cause: signal?.reason });
  const highWatermark = options.highWatermark || NumberMAX_SAFE_INTEGER;
  validateInteger(highWatermark, 'options.highWatermark', 1);
  const lowWatermark = options.lowWatermark || 1;
  validateInteger(lowWatermark, 'options.lowWatermark', 1);

  // Preparing controlling queues and variables
  if (!FixedQueue) FixedQueue = require('internal/fixed_queue');
  const unconsumedEvents = new FixedQueue();
  const unconsumedPromises = new FixedQueue();
  let paused = false;
  let error = null;
  let finished = false;
  let size = 0;

  const iterator = ObjectSetPrototypeOf({
    next() {
      // First, we consume all unread events
      if (size) {
        const value = unconsumedEvents.shift();
        size--;
        if (paused && size < lowWatermark) {
          emitter.resume();
          paused = false;
        }
        return value;
      }

      // Then we error, if an error happened
      // This happens one time if at all, because after 'error'
      // we stop listening
      if (error) {
        const p = PromiseReject(error);
        // Only the first element errors
        error = null;
        return p;
      }

      // If the iterator is finished, resolve to done
      if (finished) return closeHandler();

      // Wait until an event happens
      return new Promise(function(resolve, reject) {
        unconsumedPromises.push({ resolve, reject });
      });
    },

    return() {
      return closeHandler();
    },

    throw(err) {
      if (!err || !(err instanceof Error)) {
        throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator',
                                       'Error', err);
      }
      errorHandler(err);
    },
    [kWatermarkData]: {
      /**
       * The current queue size
       */
      get size() {
        return size;
      },
      /**
       * The low watermark. The emitter is resumed every time size is lower than it
       */
      get low() {
        return lowWatermark;
      },
      /**
       * The high watermark. The emitter is paused every time size is higher than it
       */
      get high() {
        return highWatermark;
      },
      /**
       * It checks wether the emitter is paused by the watermark controller or not
       */
      get isPaused() {
        return paused;
      }
    },
  }, AsyncIteratorPrototype);

  // Adding event handlers
  const { addEventListener, removeAll } = listenersController();
  if (!kFirstEventParam) kFirstEventParam = require('internal/events/symbols').kFirstEventParam;
  addEventListener(emitter, event, options[kFirstEventParam] ? eventHandler : function(...args) {
    return eventHandler(args);
  });
  if (event !== 'error' && typeof emitter.on === 'function') {
    addEventListener(emitter, 'error', errorHandler);
  }
  const closeEvents = options?.close;
  if (closeEvents && closeEvents.length) {
    for (let i = 0; i < closeEvents.length; i++) {
      addEventListener(emitter, closeEvents[i], closeHandler);
    }
  }
  if (signal) {
    addEventListener(
      signal,
      'abort',
      abortListener,
      { once: true });
  }

  return iterator;

  function abortListener() {
    errorHandler(new AbortError(undefined, { cause: signal?.reason }));
  }

  function eventHandler(value) {
    const arg = createIterResult(value, false);
    if (unconsumedPromises.isEmpty()) {
      size++;
      if (!paused && size > highWatermark) {
        paused = true;
        emitter.pause();
      }
      unconsumedEvents.push(arg);
    } else unconsumedPromises.shift().resolve(arg);
  }

  function errorHandler(err) {
    if (unconsumedPromises.isEmpty()) error = err;
    else unconsumedPromises.shift().reject(err);

    closeHandler();
  }

  function closeHandler() {
    removeAll();
    finished = true;
    const doneResult = createIterResult(undefined, true);
    while (!unconsumedPromises.isEmpty()) {
      unconsumedPromises.shift().resolve(doneResult);
    }

    return PromiseResolve(doneResult);
  }
}

The FixedQueue code isn't exposed on nodejs, but you can find it here. This is a pretty smart implementation of a queue that takes advantage of how v8 manages memory, and it really improved the performance big time!