mafintosh / duplexify

Turn a writable and readable stream into a streams2 duplex stream with support for async initialization and streams1/streams2 input
MIT License
190 stars 36 forks source link

Catch the status of a child process as a stream error #31

Open tyrak opened 5 years ago

tyrak commented 5 years ago

I am using duplexify to create a duplex stream out of a child process. This enables me to create complex pipelines, combining processes with other streams. So, I wrote a simple function that wraps duplexify and child_process.spawn as follows:

function run(program, args) {
    const child = spawn(program, args, {
        stdio: ["pipe", "pipe", "inherit"],
    });
    const stream = duplexify(child.stdin, child.stdout);
    child.on("error", (err) => {
        stream.destroy(err);
    });
    return stream;
}

Now I can use the stream returned by run with stream.pipeline. The only issue that I have with this solution is that I would like run to wait for the child processes' exit status, and if it is not zero, forward an error to the duplex stream. I tried to address this by adding a final method as follows:

function run(program, args) {
    const child = spawn(program, args, {
        stdio: ["pipe", "pipe", "inherit"],
    });
    const final = (stream, callback) => {
        child.on("exit", (code, signal) => {
            if (code !== 0)
                callback(new Error("child process failed"));
            else
                callback();
        });
    };
    const stream = duplexify(child.stdin, child.stdout, {final});
    child.on("error", (err) => {
        stream.destroy(err);
    });
    return stream;
}

but the final function never gets called by duplexify. Is there a way to solve this issue with duplexify?

istarkov commented 3 years ago

Modern days you can use async iterables instead of duplexify

I use spawn wrapper like

  const someCmd = (cmd:string) => {
    // eslint-disable-next-line @typescript-eslint/no-unused-vars
    let resolve_ = (_value?: unknown) => {
      //
    };

    // eslint-disable-next-line @typescript-eslint/no-unused-vars
    let reject_ = (_e: Error) => {
      //
    };

    const pWait = new Promise((r, rj) => {
      resolve_ = r;
      reject_ = rj;
    });

    const psql = spawn(execCmd, [...execArgs, ...commonArgs, '-c', cmd], {
      stdio: ['pipe', 'pipe', process.stderr],
    });

    psql.on('exit', code => {
      if (code != null && code > 0) {
        // throw new Error('process exited with error');
        console.error('EXIT CODE');
        reject_(new Error('process exited with non null code'));
      }
      resolve_();
    });

    // return stream;
    return async function* (source: NodeJS.ReadableStream) {
      if (source != null) {
        source.setEncoding('utf8');
        pipeline(source, psql.stdin, err => {
          if (err != null) {
            reject_(err);
          }
        });
      }

      for await (const chunk of psql.stdout) {
        yield chunk;
      }

      await pWait;
    } as never;
  };

Then

try {
await pipeline(
  eee(),
  someCmd(),
  blabla()
)
} catch(e) {
  <--- In case of spawn returns error code will be here
}