nodejs / readable-stream

Node-core streams for userland
https://nodejs.org/api/stream.html
Other
1.03k stars 229 forks source link

How to implement a through stream #527

Open msageryd opened 1 year ago

msageryd commented 1 year ago

Streams are sometimes hard to grasp for me. The various versions in Node doesn't make it easier.

It seems lika a good idea to use redable-stream so I can at least have a solid ground to build knowledge upon, instead of an ever changing landscape.

I'm reading a stream of binary files (stdout from a child process), the files are delimited with a delimiter. I'm looking at each chunk when it arrives and either just write it to my outputStream or create a new outputStream if a file delimiter is discovered. After that , each new stream needs to be cloned (using cloneable-readable) and piped to different locations.

Originally I had an "outputStreamFactory" which created a writableStream and piped it to it's destination. The streamFactory was used to create a new stream everytime a file delimiter was discovered. This does not work anymore, since I need to pipe the stream again (cannot pipe a writable stream).

Q1: should I use a through stream for this? Q2: readable-stream does not have a through-stream. Should I build one from a transform stream? Q3: in that case, how should I build a through stream safely from a transform stream? Q4: or should I use thgrough2, which is probably not using readable-stream under the hood

benjamingr commented 1 year ago

Not sure I understand you mean something like (no packages, this is pseudocode not an implementation just to understand the ask):

myStream.compose(async function*(s) {
  let currentReadable = new Readable();
  for await (const chunk of s) {
    let delimiter = chunk.indexOf("\n");
    if (delimiter !== -1) {
      currentReadable.push(chunk.slice(0, currentReadable));
      yield currentReadable();
      currentReadable = new Readable();
      currentReadable.push(chunk.slice(delimiter));
    } else {
      currentReadable.push(chunk);
    }
  }
  yield currentReadable();
}).forEach((readable) => {
  readable.pipe(somewhereDependingOnWhateverLogicYouHave);
});
msageryd commented 1 year ago

Almost.

I don't want to mix my "WhateverLogicYouHave" with this particular code. Instead I'm supplying a handleOutputStream function which willl be called at the start and after every new file, i.e. after each file delimiter. The clone- and pipe logic will be handled within handleOutputStream.

I'm not sure if it's necessary, but in order to be able to write to outputStream as well as piping it to other places I made it a through stream.

The Through stream looks like this:

class MyTrough extends Transform {
  constructor(options) {
    super(options);
  }

  _transform(chunk, encoding, cb) {
    //No transformation, just pass through data
    this.push(chunk);
    cb();
  }
}

Also, I'm not awaiting anything. Instead I'm using the data event like this:

child.stdout.on('data', (chunk) => {
    const delimiterIndex = chunk.indexOf(fileDelimiter, 0, 'binary');
    if (delimiterIndex > -1) {
      outputStream.end();
      fileIndex++;
      outputStream = new MyThrough();
      handleOutputStream({ fileIndex, outputStream });
       chunk = chunk.slice(
        delimiterIndex + fileDelimiter.length + 1,
        chunk.length
      );
    }
    outputStream.write(chunk);
});
vweevers commented 1 year ago

or should I use thgrough2, which is probably not using readable-stream under the hood

through2 has no reason to exist anymore. It does use readable-stream but it's outdated and merely sugar. There's now an almost as elegant solution:

const { Transform} = require('readable-stream')

const stream = new Transform({
  transform (chunk, encoding, cb) {
    // ..
  }
})

Which is equivalent to:

const stream = through2(function (chunk, encoding, cb) {
  // ..
})
msageryd commented 1 year ago

Thanks @vweevers It seems like I almost managed to build a Through stream by myself.

I do wondet what your // .. represents. Do you mean that I don't even need to push the chunks when they arrive? (se my code above)

vweevers commented 1 year ago

I do wondet what your // .. represents.

I just intended to show that the body of the function is equal between the two code snippets. Fully written out, it's:

const { Transform } = require('readable-stream')

const stream = new Transform({
  transform (chunk, encoding, cb) {
    cb(null, chunk)
  }
})

If you need exactly that, then you can also use the PassThrough utility:

const { PassThrough } = require('readable-stream')

const stream = new PassThrough()
benjamingr commented 1 year ago

I tested the code and fixed it, here is a "native" solution, can be simplified much further but has the advantage of only reading the "files" until needed.

const { Readable, PassThrough } = require("stream");

const arr = Uint8Array.from([
    ...(Array(10000).fill(1)),
    10,
    ...(Array(10000).fill(2)),
    10,
    ...(Array(10000).fill(3))
]);

let streams = Readable.from([arr, arr], { objectMode: false }).compose(async function* (stream) {
    let currentReadable = new PassThrough();
    for await (let chunk of stream) {
        let delimiter = chunk.indexOf(10);
        let found = delimiter !== -1;
        while (delimiter !== -1) {
            currentReadable.push(chunk.slice(0, delimiter)); 
            yield currentReadable;
            chunk = chunk.slice(delimiter + 1);
            currentReadable = new PassThrough();
            currentReadable.push(chunk);
            delimiter = chunk.indexOf("\n");
        }
        if (!found) {
            currentReadable.push(chunk);
        }
    }
    yield currentReadable;
});

// sometimes later
streams.forEach(async s => {
    console.log("Got stream, do whatever with it, pass it to wherever");
});

This can be made faster and the while loop can likely be simplified a lot.

msageryd commented 1 year ago

@benjamingr Wow, that is so awesome. I couldn't have dreamt up that code. Just as I thought I had a little grasp on streams I realise I have a long way to go. Compose seems like a powerful tool. I need to read up..

As I'm only looking for a specific file delimiter I switched your last chunk.indexOf("\n") to a delimiter-search. I also added delimiter.length to to the second slice, chunk = chunk.slice(delimiterIndex + fileDelimiter.length + 1).

Other then that, the code works great. Next step is to try to clone those streams. I had trouble with this last I tried (https://github.com/mcollina/cloneable-readable/issues/44).

@vweevers Thank you for pointing out that Through is available as a util in readable-stream. I didn't find this in the docs, so I assumed it was not there.

benjamingr commented 1 year ago

We are creating these new streams in our "stream of streams" why do you need to further clone them?

msageryd commented 1 year ago

Each stream is a separate file. Each file needs to be streamed through different transforms and end up in different places.

Example:

delimitedFileStream -> file1 -> S3
                             -> resizeThumbnail -> localDisk
                                                -> S3

                    -> file2 -> S3
                             -> resizeThumbnail -> localDisk
                                                -> S3

The two files extracted from delimitedFileStream will end up in three writeStreams each.

Resizing is performed with Sharp (https://github.com/lovell/sharp).

msageryd commented 1 year ago

Is there any "best practice" on how to get hold of and marchal error code out of a stream generator as above?

My original stream is stdout from a child process. I need to capture both the exit code and stderr from the child process and somehow marchal these to the caller.

I tried to wrap the code within another promise, but I don't think child.close will ever be reached since the streames are not started at this stage.

async function pdfiumCommand({ command, input, options = {} }) {
  try {
    return new Promise((resolve, reject) => {
      const child = spawnPdfiumProcess({ command, input, options });

      let fileStreams = child.stdout.compose(async function* (stream) {
        let currentReadable = new PassThrough();
        for await (let chunk of stream) {
          let delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
          while (delimiterIndex !== -1) {
            currentReadable.push(chunk.slice(0, delimiterIndex));
            yield currentReadable;

            chunk = chunk.slice(delimiterIndex + FILE_DELIMITER.length + 1);
            currentReadable = new PassThrough();
            currentReadable.push(chunk);
            delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
          }
          if (delimiterIndex === -1) {
            currentReadable.push(chunk);
          }
        }
        yield currentReadable;
      });

      let stderr = '';
      // child.stderr.on('data', (data) => {
      //   stderr += data.toString();
      // });

      child.on('close', (code) => {
        resolve({
          fileStreams,
          stderr,
          code,
        });
      });
    });
  } catch (e) {
    reject(new Error(code || 'No error code returned from "pdfium"'));
  }
}

edit: I see that my try-catch is out of alignment due to bad refactoring, but the point of this post is clear anyway, I think.

msageryd commented 1 year ago

Frustrating.. I'm not able to catch my error. I probably lack understanding of error handling with generators and/or streams.

My last attempt was to throw an error if the exit code from the child process is not 0.

async function pdfiumCommand({ command, input, options = {} }) {
  const child = spawnPdfiumProcess({ command, input, options });

  let fileStreams = child.stdout.compose(async function* (stream) {
    let currentReadable = new PassThrough();
    for await (let chunk of stream) {
      let delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
      while (delimiterIndex !== -1) {
        currentReadable.push(chunk.slice(0, delimiterIndex));
        yield currentReadable;

        chunk = chunk.slice(delimiterIndex + FILE_DELIMITER.length + 1);
        currentReadable = new PassThrough();
        currentReadable.push(chunk);
        delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
      }
      if (delimiterIndex === -1) {
        currentReadable.push(chunk);
      }
    }
    yield currentReadable;
  });

  child.on('close', (code) => {
    if (code !== 0) {
      throw new Error('PDFium error: ' + code);
    }
  });

  return fileStreams;
}

My application crashes and I can see the correct code in the crash log. But I'm not able to catch the error. I even tried pump (https://www.npmjs.com/package/pump) in the hope to catch the error in the callback, but I cant even get my "Pump finished" message when there are no errors.

const fileStreams = await pdfiumExplode2ToStream({
  input,
  options,
});

let i = 0;
fileStreams.forEach(async (fileStream) => {
  i++;
  console.log('Processing file ' + i);
  const outputPath = `../test/tmp`;
  const outputPdf = fs.createWriteStream(`${outputPath}/file_${i}.pdf`);

  try {
    pump(fileStream, outputPdf, function (err) {
      if (err) {
        console.log('error from pump callback');
        console.log(err);
      } else {
        console.log('Pump finished');
      }
    });
  } catch (err) {
    console.log('error from try-catch');
    console.log(err);
  }
});
benjamingr commented 1 year ago

compose creates a stream, my code returns a stream of streams (they're already copied), there is no need to wrap it in new Promise you can for await it directly

msageryd commented 1 year ago

I understood that. I just tried to solve my error reporting problem with an outer promise. This was a bad idea. I also tried to throw an error (my last post), but I can't find any way to catch this error.

benjamingr commented 1 year ago

How is your child process signaling it had an error? Does it terminate with a different exit code? Write to stderr? Processes don't uniformly distinguish "I exited early" from "I exited with an error" from "I finished"

benjamingr commented 1 year ago

Anyway you can .destroy(err) your fileStreams based on that and similarly propagate it to the sub-per-file-streams if you need to

msageryd commented 1 year ago

Im getting both a non zero exit code and a message on stderr if something goes wrong in the child process. But I'm not able to propagate the error up in the chain.

I tried this, but it didn't work:

  child.on('close', (code) => {
    if (code !== 0) {
      fileStreams.destroy(new Error('PDFium error: ' + code));
    }
  });
msageryd commented 1 year ago

I also tried to destroy each inner stream.

  child.on('close', (code) => {
    if (code !== 0) {
      fileStreams.forEach(async (fileStream) => {
        fileStream.destroy(new Error('PDFium error: ' + code));
      });

      fileStreams.destroy(new Error('PDFium error: ' + code));
    }
  });
msageryd commented 1 year ago

I found one (ugly) solution. This is not a neat solution, but at least I can get hold of the error message from stderr.

The first stream in the composed "bundle" is now stderr. I'm sure there must be better ways to solve this.

  let errorStream = new PassThrough();

  let fileStreams = child.stdout.compose(async function* (stream) {
    yield errorStream;
    let currentReadable = new PassThrough();

    for await (let chunk of stream) {
      let delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
      while (delimiterIndex !== -1) {
        currentReadable.push(chunk.slice(0, delimiterIndex));
        yield currentReadable;

        chunk = chunk.slice(delimiterIndex + FILE_DELIMITER.length + 1);
        currentReadable = new PassThrough();
        currentReadable.push(chunk);
        delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
      }
      if (delimiterIndex === -1) {
        currentReadable.push(chunk);
      }
    }
    yield currentReadable;
  });

  child.stderr.on('data', (data) => {
    errorStream.push(data);
  });
msageryd commented 1 year ago

@benjamingr I don't understand what's going on. It would be awesome if I could use your concept, but something is fishy. I suspect that the streams starts too early so they might have ended before I try to use them.

In my example I have only one file in my componsed file streams.

The following works fine:

jpgFileStream.forEach(async (fileStream) => {
  const outFile = fs.createWriteStream(`../test/tmp/fromGenerator.jpg`);
  fileStream.pipe(outFile);
});

As soon as I introduce a transform stream (sharp) I get empty files in the output.

jpgFileStream.forEach(async (fileStream) => {
  const outFile = fs.createWriteStream(`../test/tmp/fromGenerator.jpg`);
  const sharpPipeline = sharp().rotate(90);
  fileStream.pipe(sharpPipeline).pipe(outFile);
});

Sometimes I get "cut off" jpeg files like this, which leads me to believe that the streams are already started:

image

The sharpPipeline works fine if I get the input directly from a readable filestream.

const sharpPipeline = sharp().rotate(90);
const inFile = fs.createReadStream('../test/resources/test.jpg');
const outFile = fs.createWriteStream(`../test/tmp/fromLocalDisk.jpg`);
inFile.pipe(sharpPipeline).pipe(outFile);
msageryd commented 1 year ago

Update: This has nothing to do with sharp.

Whenever my childprocess takes a bit longer to produce the stream (i.e. more complicated processing), the destination writeStream seems to end prematurely, i.e I get empty or sometimes half files written to disk.

It's almost as if the for await (let chunk of stream) loop does not actually await the chunks from stdout.

I went back to my original solution and tried the exact same files and file processing. It works great. The main difference is that I provide a handleOutputStream function which is called after each new troughStream is created, i.e. I handle each stream immediately instead of waiting for them to become a bunch of streams.

I suspect that this packaging och streams in streams either has some design problem or maybe some stream bug. Compose is still marked as experimental in the docs.