caolan / highland

High-level streams library for Node.js and the browser
https://caolan.github.io/highland
Apache License 2.0
3.43k stars 147 forks source link

_(readable).pipe(writable) does not destroy readable when writable closes #691

Open richardscarrott opened 4 years ago

richardscarrott commented 4 years ago

I know native Node streams also do not exhibit this behaviour either but they provide pipeline (AKA pump) to ensure proper cleanup. I wondered what the equivalent pattern is for highland.

My particular use case is piping a readable stream to a Node.ServerResponse, e.g.

const express = require('express');
const highland = require('highland');
const { pipeline } = require('stream');

const app = express();

app.get('/highland', (req, res) => {
  const readable = createNodeReadableStream();
  _(readable).pipe(res);
  res.destroy();
  setInterval(() => {
     console.log(aNodeReadableStream.destroyed) // Always false, even when `res` is closed šŸ˜”
  }, 1000);
});

app.get('/node-stream', (req, res) => {
  const readable = createNodeReadableStream();
  pipeline(readable, res, (ex) => {
     ex && console.error(ex);
  });
  res.destroy();
  setInterval(() => {
     console.log(aNodeReadableStream.destroyed) // Becomes true when `res` closes šŸ™‚
  }, 1000);
});

If res is destroyed, either explicitly as above or for example by the browser cancelling the request, readable is not destroyed and therefore any cleanup code is not run -- it will continue to write until it's buffer reaches the highWaterMark.

vqvu commented 4 years ago

I haven't tried this myself, but does converting the Highland stream to a Node Readable via toNodeStream()and then using pipeline work?

toNodeStream() is meant to be the escape hatch for any Node stream interop that isn't directly supported.

On Fri, Jan 10, 2020, 12:08 PM Richard Scarrott notifications@github.com wrote:

I know native Node streams also do not exhibit this behaviour either but they provide pipeline https://nodejs.org/api/stream.html#stream_stream_pipeline_streams_callback (AKA pump https://github.com/mafintosh/pump) to ensure proper cleanup. I wondered what the equivalent pattern is for highland.

My particular use case is piping a readable stream to a Node.ServerResponse https://nodejs.org/api/http.html#http_class_http_serverresponse, e.g.

const express = require('express'); const highland = require('highland'); const { pipeline } = require('stream');

const app = express();

app.get('/highland', (req, res) => {

const readable = createNodeReadableStream();

_(readable).pipe(res);

res.destroy();

setInterval(() => {

 console.log(aNodeReadableStream.destroyed) // Always false, even when `res` is closed šŸ‘Ž

}, 1000);

});

app.get('/node-stream', (req, res) => {

const readable = createNodeReadableStream();

pipeline(readable, res, (ex) => {

 ex && console.error(ex);

});

res.destroy();

setInterval(() => {

 console.log(aNodeReadableStream.destroyed) // Becomes true when `res` closes šŸ‘

}, 1000);

});

If res is destroyed, either explicitly as above or for example by the browser cancelling the request, readable is not destroyed and therefore any cleanup code is not run -- it will continue to write until it's buffer reaches the highWaterMark.

ā€” You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/caolan/highland/issues/691?email_source=notifications&email_token=ABRDEZP5FX2SOOSUJE77WE3Q5C2RTA5CNFSM4KFLNGT2YY3PNVWWK3TUL52HS4DFUVEXG43VMWVGG33NNVSW45C7NFSM4IFNEYZQ, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABRDEZIKJSJAZW5HQXGEYVDQ5C2RTANCNFSM4KFLNGTQ .

richardscarrott commented 4 years ago

Unfortunately not, however keeping readable and writable outside of highland by composing transforms with _.pipeline seems to work, e.g.

const _ = require('highland');
const { pipeline } = require('stream');
const readable = createNodeReadableStream();
const transform = _.pipeline(
  _.filter((chunk) => !chunk.includes('foo')),
  _.map((chunk) => `::${chunk}::`)
);
pipeline(readable, transform, res, (ex) => {
   if (err) {
     throw err;
   } 
  console.log('DONE');
})

As an aside, I noticed the Stream methods (Stream.prototype.map / Stream.prototype.filter etc.) return a new Stream instance, whereas _.map, _.filter appear to be composed into a single Stream with _.pipeline; are they functionally any different?

richardscarrott commented 4 years ago

Although I imagine you'd lose a lot of expressiveness by avoiding creating readable streams in highland so it's not an ideal solution. e.g.

const readFile = _.wrapCallback(fs.readFile);
const fileNames = ['foo.js', 'bar.js'];
const readable = _(fileNames).map(readFile).sequence();
// The `readFile` streams are not destroyed when `res` is closed šŸ˜”
pipeline(readable, res, (ex) => { console.log('DONE', ex) });

I guess you could manually track them, e.g.

let readFileStreams = [];
const readable = _(fileNames).map(readFile).tap((s) => readFileStreams.push(s)).sequence().pipe(res);
res.on('close', () => {
    readFileStreams.forEach((s) => s.destroy())
})

But that's not too pretty...

vqvu commented 4 years ago

Now that I think about it, _(readable) uses pipe under the hood, so of course it wouldn't work...

Making this work correctly is actually pretty difficult. Highland steams are lazy-by-default, so the code doesn't do a good job of propagating destroy events back up the pipeline---there's not usually a need to if the values are generated lazily. We've tried to make this work before, but the complexity of propagating destroy events up through forks in the pipeline (i.e., merge and fork) caused it to stall out.

For now, I would say that you should do what you suggest in https://github.com/caolan/highland/issues/691#issuecomment-573352272 and use _.pipeline alongside and stream.pipeline to destroy the original stream. You can do this without sacrificing the chaining syntax by structuring your code like so:

const _ = require('highland');
const { pipeline } = require('stream');

function transformReadable(stream) {
  // In this function, you can pretend like you have a regular Highland stream that's
  // correctly wrapping the Readable.
  return stream
      .filter((chunk) => !chunk.includes('foo'))
      .map((chunk) => `::${chunk}::`);
}

const readable = createNodeReadableStream();
pipeline(readable, _.pipeline(transformReadable), res, (ex) => {
 if (err) {
   throw err;
 } 
  console.log('DONE');
});

The above won't work if you need to merge multiple Readables or fork into multiple Writables. In those cases, you'll need to manage stream destruction manually.


I noticed the Stream methods (Stream.prototype.map / Stream.prototype.filter etc.) return a new Stream instance, whereas _.map, _.filter appear to be composed into a single Stream with _.pipeline; are they functionally any different?

Functionally, they're the same. However, semantically, you should think of _.pipeline as returning a Node Duplex stream that encapsulates all of the transforms that you passed to _.pipeline (even though the current implementation actually returns a Highland stream that pretends to be a Duplex stream).

richardscarrott commented 4 years ago

Making this work correctly is actually pretty difficult. Highland steams are lazy-by-default, so the code doesn't do a good job of propagating destroy events back up the pipeline---there's not usually a need to if the values are generated lazily. We've tried to make this work before, but the complexity of propagating destroy events up through forks in the pipeline (i.e., merge and fork) caused it to stall out.

I had a (naive) go at implementing it just so I could understand the difficulties a bit better if you want to take a look -- https://github.com/caolan/highland/pull/692