soldair / node-forkfriend

worker child process manager. respawn children. load balance work amongst children.
10 stars 4 forks source link

Piping one "forkfriend" stream to another doesn't work #11

Open marshall007 opened 7 years ago

marshall007 commented 7 years ago

Given the following setup:

const forkfriend = require('forkfriend');
const pipe = require('multipipe');

const { Transform } = require('stream');

function worker (type, data) {
  const WORKER_PATH = `${__dirname}/workers/${type}.js`;

  pool = forkfriend({ respawnInterval: 0 });
  pool.add(WORKER_PATH, [ JSON.stringify(data) ], 2);

  return pool;
}

This stream does not work correctly:

let stream = pipe(
  worker('rules', [ /* data */ ]),
  worker('standards', [ /* data */ ])
)

However, if I pipe to an intermediary no-op Transform stream, everything does work as expected:

let stream = pipe(
  worker('rules', [ /* data */ ]),
  new Transform({
    objectMode: true,
    transform(data, encoding, cb) {
      cb(null, data)
    }
  }),
  worker('standards', [ /* data */ ])
)
soldair commented 7 years ago

interesting. i expect that this needs some updates its fallen quite a bit behind on streams.

marshall007 commented 7 years ago

@soldair I tried forking this project and switching it to use through2 internally, but that seemed to have no effect. Happy to help investigate. Let me know if there's anything I can do.

marshall007 commented 7 years ago

@soldair to clarify, the behavior when piping to another "forkfriend" stream is that only n records are processed off the stream before the end event is triggered. Where n is equal to the number of children specified when calling .add(<script>, <args>, <n>).

In other words, it appears that the stream is closed after each of the child processes in the first stream process a single record.

soldair commented 7 years ago

oh. rather curious