rocket-connect / for-emit-of

Turn Node.js Events into Async Iterables.
https://www.npmjs.com/package/for-emit-of
MIT License
9 stars 2 forks source link

Timeout option #3

Closed Farenheith closed 4 years ago

Farenheith commented 4 years ago

Closes #1

To control the timeout, I changed the sleep strategy used in the generator to a new one that uses a promise over listeners on data, error, and end/close.

The timeout is controlled in the wrapper returned by the function in timeout.ts. It uses the sleep method to wait until the moment the timeout must be checked out. When this moment comes, the deadline may be updated because some value was obtained from the data event, in that case, a new sleep is executed until the next deadline moment.

danstarns commented 4 years ago

Hey, thank you for taking the time on this. In terms of the requirements, timeout should be the max time in-between each event (My bad I did not state them!). I don't think we should throw Event timed out until at least 1 event has yield What do you think?

There is an important bit of code that you removed,

/* We do not want to block the process!
This call allows other processes
a chance to execute.
*/
await sleep(0);
Farenheith commented 4 years ago

@danstarns I think it's fair enough! I'll change the code so it complies with the in-between requirement :)

Farenheith commented 4 years ago

@danstarns done! I created an option inBetweenTimeout with default value as true. I think that maybe there's some case of use where one need for the timeout to be thrown even before the first event.

Farenheith commented 4 years ago

@danstarns about the sleep(0), I changed it with a new strategy with a promise that resolves when an event is emitted, so I can race the timeout with it. The event emission may be synchronous, so I still keep the buffer array you created to get those values and that promise is only used when the array is empty, no error had been throw and the event emitter is still active

So, the equivalent to the sleep is done at this point

if (await Promise.race(getRaceItems())) {
        throw Error("Event timed out");
      }
Farenheith commented 4 years ago

@danstarns I apologize. I created a new test case to guarantee that the for await over the generated async iterable is nonblocking and I could see the importance of this piece of code. I put it again in the code with the new case

danstarns commented 4 years ago

@Farenheith I like the idea of making it configurable with inBetweenTimeout although I don't believe its working as intended.

With the below code I expect it to throw Event timed out

const forEmitOf = require("./dist");
const fs = require("fs");
const path = require("path");
const sleep = require("util").promisify(setTimeout);

async function main() {
  const readStream = fs.createReadStream(
    path.join(__dirname, "./package.json")
  );

  const iterator = forEmitOf(readStream, {
    transform: (buff) => buff.toString(),
    timeout: 1000, // TIMEOUT less than 10 seconds
    inBetweenTimeout: false,
  });

  await sleep(10000); // 10 seconds longer than TIMEOUT

  for await (const chunk of iterator) {
    console.log(chunk);
  }
}

main();

I'm thinking we could have 2 parameters & this may make it easier to reason about.

We wont need timeout with this

  1. inBetweenTimeout => max time allowed in between each event
  2. firstEventTimeout => max time allowed from construction() to first event.
interface Options<T = any> {
  inBetweenTimeout?: number;
  firstEventTimeout?: number;
  ...
}
Farenheith commented 4 years ago

@danstarns The problem is that the timeout is applied, by the implementation, to the event emission, not the iteration. The event emission starts at the first chance node has to fulfill the fs promises. I put some console logs to illustrate it here, on index.ts:

  const eventListener = <T>(event: T) => {
    console.log("chunk received");
    return events.push(event);
  };
  const endListener = () => {
    console.log("ended");
    active = false;
  };

Here is the code I used to test:

const forEmitOf = require("./dist");
const fs = require("fs");
const path = require("path");
const sleep = require("util").promisify(setTimeout);

async function main() {
  const readStream = fs.createReadStream(
    path.join(__dirname, "./package.json")
  );

  const iterator = forEmitOf(readStream, {
    transform: (buff) => buff.toString(),
    timeout: 1000, // TIMEOUT less than 2 seconds
    inBetweenTimeout: false,
  });
  console.log("before sleep");

  await sleep(2000); // 10 seconds longer than TIMEOUT

  console.log("before for");
  for await (const chunk of iterator) {
    console.log(chunk.length);
  }
  console.log("after for");
}

main();

The console log result looked like this:

before sleep
chunk received
ended
ended
before for
1779
after for

See that, chunk received was print before the start of the iteration, and it was pushed to the events array. Even before the iteration, the emitter ended, so, when the iteration started, all it had to do was to iterate over the events array. Also, even if you have an emitter that emits more than one time and if you do something like this:

  for await (const chunk of iterator) {
   await sleep(2000);
    console.log(chunk.length);
  }

Still, the timeout will not be thrown, unless the event emission takes more than 1 second. Don't you think this is a desirable behavior? Think about a situation where the user of this library may do some slow operations, which will take longer than the timeout. In this situation, I think the user will expect that the timeout defines a deadline to the event emission, but his code is not limited by it.

About the changes in the timeout parameters, I think your proposal makes much sense as it brings more flexibility for the lib user! I'll make the changes and update the PR :).

Farenheith commented 4 years ago

@danstarns I changed the timeout options as you suggested :) Let me know what you think about the timeout behavior in regard of my last comment.

Thanks for your considerations!

Farenheith commented 4 years ago

@danstarns just one more important thing. There is another piece of code I removed as I found a situation which could generates an error. This one:

      if (!event) { 
        continue;   
      }

The problem I found is that, if an event emitter emits an event with a falsy argument, like undefined, 0 or false, this value would not be yielded. Example:

myEmitter.emit('data', false);
myEmitter.emit('data', 0);
myEmitter.emit('data', '');
myEmitter.emit('data', 'not empty');

Before removing the quoted if, this would only yield the last value. As I changed it, I think I should create a test case to validate such behavior. I'll create it

danstarns commented 4 years ago

@Farenheith Hey! Thanks for changing it soo quickly πŸ±β€πŸ‘€ I'm still struggling with firstEventTimeout from your comment It seems to me that we can't guarantee the execution of firstEventTimeout. Could it cause more issues than it's trying to solve?

I've been playing around with firstEventTimeout and I would expect the below code to throw Event timed out.

const forEmitOf = require("./dist");
const { EventEmitter } = require("events");

async function main() {
  const emitter = new EventEmitter();

  const iterator = forEmitOf(emitter, {
    firstEventTimeout: 1000,
  });

  setTimeout(() => {
    emitter.emit("data", "hi");
  }, 2000);

  for await (const msg of iterator) {
    console.log(msg);
  }
}

main();

In response to the bug you found

Maybe we can check for 'nullish' values null & undefined and allow all others. I think there is a new language feature that could help here πŸ€”

Farenheith commented 4 years ago

@danstarns I'm actually with time to spend due to this pandemic scenario lol. Such boredom, dude... You got a bug in my implementation, but it's actually simple to fix. I was validating firstEventTimeout only when inBetweenTimeout was informed, my bad. I fixed it and created a new test case to cover it. Take a look when you can in the last commit!

Also, I'm creating a library more for fun that works quite like rxjs, but over Iterables and AsyncIterable instead of Observables. I'm quite enthusiastic about it and I want to make it works also with EventEmitters, and I loved the solution you find with for-emit-of, it fits very well in what I was looking for

danstarns commented 4 years ago

@Farenheith Nice, Glad it works :) Send a link I would love to check it out!

Your commit fixed my issue after I remembered to run npm run build πŸ˜‚

Getting undesired behavior, expected/actual

const forEmitOf = require("./dist");
const { EventEmitter } = require("events");

async function main() {
  const emitter = new EventEmitter();

  const iterator = forEmitOf(emitter, {
    firstEventTimeout: 1000,
  });

  setTimeout(() => {
    emitter.emit("data", "hi");
  }, 100);

  for await (const msg of iterator) {
    console.log(msg);
  }
}

main();

expected

$ node test
hi

actual

$ node test
hi
(node:25028) UnhandledPromiseRejectionWarning: Error: Event timed out
Farenheith commented 4 years ago

It's this project I talked about. I just sent a commit fixing this scenario you found. Thank you for the care you're having with my implementation as some of these scenarios I was really not covering. I have a special desire for this timeout to work for the first emission, as you may guess lol.

danstarns commented 4 years ago

@Farenheith Nice it's looking good, I can see you have put a lot of time into it

199 commits ahead of kataik:master 😲

I've tested all your updates & I feel they are all working as intended. I'm going to merge this & commit some examples plus additions to the README. v1.1.0 will be released very shortly, thank you for your help πŸ˜ƒ

Farenheith commented 4 years ago

The number of commits is not that much lol... I'm just a compulsive commiter lol

danstarns commented 4 years ago

@Farenheith me too :) Its the best way :)