piscinajs / piscina

A fast, efficient Node.js Worker Thread Pool implementation
Other
4.11k stars 103 forks source link

Feature Request: Seamless stream and/or async iterable support #110

Open ronag opened 3 years ago

ronag commented 3 years ago

Would be nice to be able to provide a stream and/or async iterable (of buffers) as argument and have is seamlessly (using transferable) accessible in the worker.

e.g.

const handle = await fs.open(dst)
try {
  for await (const buf of worker.runTask(fs.createReadStream(src))) {
     await handle.write(buf)
  }
} finally {
  await handle.close()
}
module.exports = async function * (source) {
  for await (const buf of source) {
    yield veryExpensiveProcessing(buf)
  }
}
ronag commented 3 years ago

maybe duplicate of https://github.com/piscinajs/piscina/issues/108

ronag commented 3 years ago

@mcollina This would make it possible to do e.g.

await pipeline(piscina.move(src), piscina.transform(workerPath), dst)
jasnell commented 3 years ago

Unfortunately we can't make streams seamlessly transferable/cloneable using move. The only objects that can be made transferable are core objects that extend from BaseObject. It would be possible to create a Transform implementation whose _transform dispatched to Piscina and continued on after each task was done. So something like..

const piscina = new Piscina({ /** ... **/ });
const workerTransform = new MyTransformer(piscina);
await pipeline(src, workerTransform, dst);

I have been thinking about adding a new type of Stream in core that complements MessagePort to implement a highly efficient cross-thread stream model. Basically something like:

const { MessageDuplex } = require('worker_threads')

const mc = new MessageChannel();

mc.port1.onmessage = async ({data}) => {
  for await (const chunk of data)
    data.write(transformIt(chunk));
};

const d = new MessageDuplex();
d.on('data', console.log);

mc.port2.postMessage(d);

d.write('hello');
d.end('there');

The idea here is that MessageDuplex would use highly efficient signaling and no-copy mechanisms to exchange data with the paired clone. It would get very close to what you're looking for here.

bottom line is that the limitations of cloning/transfering over MessagePort definitely impose some restrictions.

jasnell commented 3 years ago

Regarding this pattern.... I'm not sure this (an async generator as a worker) is a pattern that could work with the worker pool. Retaining the generator state would be problematic. Not necessarily impossible but extremely complicated. Will have to think about how we could do it and what the semantics would actually be.

module.exports = async function * (source) {
  for await (const buf of source) {
    yield veryExpensiveProcessing(buf)
  }
}
ronag commented 3 years ago

I have been thinking about adding a new type of Stream in core that complements MessagePort to implement a highly efficient cross-thread stream model. Basically something like:

I think all of that could be hidden away in a piscina.pipeline() helper, no?

jasnell commented 3 years ago

Possibly, but we still hit up against limitations on MessagePort. Lemme stew over it a bit

jasnell commented 2 years ago

https://github.com/piscinajs/piscina/pull/170 illustrates using transferable whatwg readable and writable streams to an individual worker. It doesn't quite do what this issue is suggesting but it definitely gets a bit closer!

github-actions[bot] commented 1 month ago

This issue has been marked as stale because it has been opened 30 days without activity. Remove stale label or comment or this will be closed in 5 days.

github-actions[bot] commented 3 weeks ago

This issue has been marked as stale because it has been opened 30 days without activity. Remove stale label or comment or this will be closed in 5 days.