piscinajs / piscina

A fast, efficient Node.js Worker Thread Pool implementation
https://piscinajs.github.io/piscina/
Other
4.16k stars 103 forks source link

Stream example results in MaxListenersExceededWarning #613

Closed eXigentCoder closed 3 weeks ago

eXigentCoder commented 1 month ago

Hey there, #137 might be related, I copied the files from ~/examples/stream, created a basic package.json file with piscina as the only dependency did an npm install and then tried to run it and I get the following warning with no other output to the console:

(node:82892) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 close listeners added to [WriteStream]. Use emitter.setMaxListeners() to increase limit
(Use `node --trace-warnings ...` to show where the warning was created)
IMPORT PISCINA FROM "PISCINA";
IMPORT { MESSAGEPORTDUPLEX } FROM "./STREAM.MJS";
IMPORT { CREATEREADSTREAM } FROM "FS";
IMPORT { PIPELINE } FROM "STREAM";

CONST POOL = NEW PISCINA({
  FILENAME: NEW URL("./WORKER.MJS", IMPORT.META.URL).HREF,
});
POOL.SETMAXLISTENERS(200);

CONST { PORT1, PORT2 } = NEW MESSAGECHANNEL();

POOL.RUN(PORT2, { TRANSFERLIST: [PORT2] });

CONST DUPLEX = NEW MESSAGEPORTDUPLEX(PORT1);
PIPELINE(
  CREATEREADSTREAM(NEW URL("./INDEX.MJS", IMPORT.META.URL).PATHNAME),
  DUPLEX,
  PROCESS.STDOUT,
  (ERR) => {
    IF (ERR) THROW ERR;
  }
);

I tried useAtomics: false as well but get the same error.

Still trying to get to grips with the library so might be a misunderstanding on my part but essentially want I want to do is stream results out of the worker, so this might not even be the right example for my use case 😂

Tried it with both node v16.18.1 and v20.14.0

metcoder95 commented 1 month ago

This is somehow expected, what exactly do you want to stream from the workers? stdout/err?

eXigentCoder commented 1 month ago

So essentially, each worker is going to receive a noql query which is similar to SQL, it will then parse and execute it against a MongoDB server and then for each chunk, run some transforms that are CPU intensive and stream the results back, allowing us to scale horizontally.

I guess for the example, I was expecting some console output showing how it was using the streams.

metcoder95 commented 1 month ago

For that, you can use this example to see how an stream can be used.

If you set each chunk as a single string that can be printed into console without parsing, you don't need to pipe it to process.stdout but rather just use console.log on every chunk.

eXigentCoder commented 1 month ago

Thanks so much, will take a look!