rocket-connect / for-emit-of

Turn Node.js Events into Async Iterables.
https://www.npmjs.com/package/for-emit-of
MIT License
9 stars 2 forks source link

Enhamcement/Idea: cancelable/abortable iterators #13

Closed Farenheith closed 4 years ago

Farenheith commented 4 years ago

I was thinking about a problem we can have with this library due to the iterator design: imagine a code like this:

async function* firstIteration(): AsyncIterable<number> {
  const iterable = forEmitOf(someHeavyEmitter);
  for await (const item of iterable) {
    yield item * 2;
  }
}

async function* secondIteration(previousIterable: : AsyncIterable<number>) {
  for await (const item of previousIterable) {
      if (item > 20) {
          break;
      }
      yield item;
  }
}

async function run() {
   for await (const item of secondIteration(firstIteration())) {
    console.log(item);
   }
}

run();

The break on the second iterable, will make the code behave apparently as expected, but, behind the curtains, the events array created by the forEmitOf may keep getting new items, which can lead, ultimately, for serious memory leaks.

Idea

As a matter of fact, the user of this library must be aware of this situation to not break the for await in cases like that, but it would be nice to have some aborting mechanism to make sure no hidden process will keep filling the memory. But such a mechanism must be propagable, as I can link an indefinite number of iterables. Also, I think it should be separated from this library, as it is not necessarily linked to iterables generated from EventEmitters.

The first idea was something used like that:

function firstIteration(): AsyncIterable<number> {
  return cancellable(async function* () {
    // internally, forEmitOf would return a cancellable iterable with a custom operation
    const iterable = forEmitOf(someHeavyEmitter); 
    for await (const item of iterable) {
      yield item * 2;
    }
  });
}

function secondIteration(previousIterable: : AsyncIterable<number>) {
  return cancellable(async function* internal() {
    for await (const item of previousIterable) {
        if (item > 20) {
            cancel(iterable);
            break;
        }
        yield item;
    }
  });
}

async function run() {
   for await (const item of secondIteration(firstIteration())) {
    console.log(item);
   }
}

run();

While the iteration works from inside out (starting at the first iterable in the chain) the cancel call would work from out to inside, propagating the cancellation while the referenced iterable is cancellable.

I know it's a bit of a pain to use a feature designed like that, but it's the best idea I could figure out yet. In the library I'm creating I would make all this automatically, so, the usability would be not bad at all, but still, if the user uses a for await of with a break without treating the cancelation, he can fall in that issue.

What do you think? I really want some opinions about this approach because I'm not quite sure, and I feel that, if I want to publish this freature I need to deal with this situation, as it's quite easy for the user to lose itself in a chain of operations

Farenheith commented 4 years ago

@danstarns I managed to create a test where you can simulate the memory leak

const { sleep } = require('./dist/sleep');
const { instant } = require('./dist/instant');
const forEmitOf = require('./dist');
const { EventEmitter } = require('events');

async function leakTheMemory(emitter) {
    const duration = 5 * 60 * 1000;
    const initial = instant();
    let lastMemLog = 0;

    while (instant() - initial < duration){
        for (let i = 0; i < 10000; i++) {
            emitter.emit('data', '1'.repeat(100000));
        }
        await sleep(0);
        const second = Math.floor((instant() - initial) / 1000);
        if (second - lastMemLog >= 1) {
            lastMemLog = second;
            console.log(`Using ${(process.memoryUsage().heapUsed / (1024 * 1024)).toFixed(2).padStart(10)} MB after ${second} s`);
        }
    }
    console.log('ended emission');
    await sleep(10);
    console.log('end');
}

const emitter = new EventEmitter();

const iterable = forEmitOf(emitter, { debug: true });

setImmediate(() => leakTheMemory(emitter));

async function iterate() {
    let count = 0;
    for await (const item of iterable) {
        if (count > 10) {
            break;
        }
        count++;
    }
    console.log('ended iteration');
}

iterate();

The memory increasing is due to the event array that keeps receiving the new events.

Farenheith commented 4 years ago

@danstarns I just want to thank you for accepting this issue!

Take a look in this test:

const {
    sleep
} = require("./dist/sleep");
const {
    instant
} = require("./dist/instant");
const forEmitOf = require("./dist");
const {
    EventEmitter
} = require("events");

async function leakTheMemory(emitter) {
    const duration = 5 * 60 * 1000;
    const initial = instant();
    let lastMemLog = 0;

    while (instant() - initial < duration) {
        for (let i = 0; i < 10000; i++) {
            emitter.emit("data", "1".repeat(100000));
        }
        await sleep(0);
        const second = Math.floor((instant() - initial) / 1000);
        if (second - lastMemLog >= 1) {
            lastMemLog = second;
            console.log(
                `Using ${(process.memoryUsage().heapUsed / (1024 * 1024))
          .toFixed(2)
          .padStart(10)} MB after ${second} s`
            );
        }
    }
    console.log("ended emission");
    await sleep(10);
    console.log("end");
}

const emitter = new EventEmitter();

const iterable = forEmitOf(emitter, {
    debug: true,
});

setImmediate(() => leakTheMemory(emitter));

async function* intermediateIterable() {
    let count = 0;
    for await (const item of iterable) {
        console.log(`intermediating ${count}`);
        yield item;
        count++;
    }
}

async function iterate() {
    let count = 0;
    for await (const item of intermediateIterable()) {
        if (count > 10) {
            break;
        }
        count++;
    }
    console.log("ended iteration");
}

iterate();

A slightly different test from the last I gave here. There is an intermediate for await in the function intermediateIterable, so, the last for await does not iterate directly over the AsyncIterable returned by forEmitOf. After the merge, take a look on how the console log looks for this code:

for-emit-of git:(master) node test.js
No more results to yield but emitter still active. Starting timeout race
Finished response racing. Winner: undefined
Results to yield: 10000
intermediating 0
Results to yield: 19999
intermediating 1
Results to yield: 29998
intermediating 2
Results to yield: 39997
intermediating 3
Results to yield: 49996
intermediating 4
Results to yield: 59995
intermediating 5
Results to yield: 69994
intermediating 6
Results to yield: 79993
intermediating 7
Results to yield: 89992
intermediating 8
Results to yield: 99991
intermediating 9
Results to yield: 109990
intermediating 10
Results to yield: 119989
intermediating 11
Iterator return called and process finalized
ended iteration
Using     123.63 MB after 1 s
Using     113.58 MB after 2 s
Using     119.72 MB after 3 s
Using     117.74 MB after 4 s
...

Look that, the outer for await executed 11 times (based on Result to yield lines), while the intermediating iterated 12 times (base on intermediating lines), but just after that, the return() is called by the internal iterator engine and the iterator ends with no further memory consumption!

Do you see how powerful this is? No matter how much intermediate iterables I put in the way between the forEmitOf and the for await that really executes the function, still, as long as all intermediate iteratings do call return, all will work as expected! This is really really great!

So, thank you for being so receptive with my suggestions and congratulations for this great idea you had making this package!

danstarns commented 4 years ago

@Farenheith Nice 😎 This is very powerful, can't wait for more people to realize it :) Thank you for all your contributions 🍻