rocket-connect / for-emit-of

Turn Node.js Events into Async Iterables.
https://www.npmjs.com/package/for-emit-of
MIT License
9 stars 2 forks source link

Performance improvement for faster iterations #23

Closed Farenheith closed 2 years ago

Farenheith commented 2 years ago

Hello!

I just got an idea that seems quite reasonable and can give a boost at the iteration speed of the async iterables generated by this package. Under the hood, this package creates a queue with all the values that needs to be yielded and, when this queue is empty, it just waits some milliseconds to see if some value entered the queue, right?

Well, the thing is that, when it happens that the queue is empty, instead of waiting some milliseconds, it could just disable temporarily the queue control and just yield directly the event result, whenever it happens! If the iteration over the for await is faster than the event emitting, than it'll give a great performance improvement, as you remove entirely that ms wasted when the queue is empty!

Here's an example of code where I tested the concept. You just need to create a big.txt file with a big bunch of lines to check it out, or you can use this one: big.txt

const rl = readline.createInterface({
        input: fs.createReadStream('big.txt'),
      });
      const iterable = {
          [Symbol.asyncIterator]() {  
            let onError;
            let onClose;
            let onLine;
            let queue = {};
            let error;
            onError = (value) => {
              rl.off('close', onClose);
              rl.off('line', onLine);
              error = value;
            };
            onClose = () => {
              rl.off('error', onError);
              rl.off('line', onLine);
              queue = undefined;
            };
            onLine = (value) => {
              if (queue) {
                const node = { value };
                if (queue.last) {
                  queue.last = queue.last.next = node;
                } else {
                  queue.last = queue.next = node;
                }
              }
            };
            rl.on('line', onLine);
            rl.once('error', onError);
            rl.once('close', onClose);
            function next() {
              if (!queue) {
                return { done: true };
              }
              if (error) {
                throw error;
              }
              if (queue.next) {
                const { value } = queue.next;
                queue.next = queue.next.next;
                if (!queue.next) {
                  queue.last = undefined;
                }
                return {
                  value
                };
              } else {
                // Here I just disable the queue control, as the queue is empty, and I create a new Promise
                // That will resolve when the line event is emitted  
                rl.off('line', onLine);
                return new Promise((resolve, reject) => {
                  let onErrorOnce;
                  let onCloseOnce;
                  let onLineOnce;
                  onErrorOnce = (value) => {
                    rl.off('close', onCloseOnce);
                    rl.off('line', onLineOnce);
                    reject(value);
                  };
                  onCloseOnce = () => {
                    rl.off('error', onErrorOnce);
                    rl.off('line', onLineOnce);
                    resolve({ done: true });
                  };
                  onLineOnce = (value) => {
                    rl.off('close', onCloseOnce);
                    rl.off('error', onErrorOnce);
                   // Before yielding that single result, I re-enable the queue control, so I don't take any risk of loosing data
                    rl.on('line', onLine);
                    resolve({ value });
                  };
                  rl.once('line', onLineOnce);
                  rl.once('error', onErrorOnce);
                  rl.once('close', onCloseOnce);
                });
              }
            }
            return {
              next
            }
          }
        }

      let i = 0;
      for await (const line of iterable) {
        i += 1;
      }
      console.log(`Read ${i} lines`);

In this example, rl already is an async iterable, but with that approach, I got a performance 34% better than directly iterating over it, but still 60% worst than just listening to the events. Maybe there's some performance improvement it can be done in this strategy yet, but I decided to just open the issue here to start the discussion. I began to look into this after I saw this issue: https://github.com/nodejs/node/issues/31979. First, I tried to use for-emit-of to see if I could get a performance batter than the directly iteration, but unfortunately it got pretty slower, and then I tried this, and I think it's a very good improvement for your package!

danstarns commented 2 years ago

Merged in #24