caolan / highland

High-level streams library for Node.js and the browser
https://caolan.github.io/highland
Apache License 2.0
3.43k stars 147 forks source link

Piped streams not flowing with batch #617

Open Sintendo opened 7 years ago

Sintendo commented 7 years ago

We're dealing with an issue where piping streams through a highland batched stream causes it to halt. The actual code involves streaming data from HTTP, but I think I've managed to narrow it down to the following code.

let {Transform, Writable} = require('stream');
let _ = require('highland');

// Dummy transform
class NullTransform extends Transform {
  constructor(options) {
    options = options || {};
    options.writableObjectMode = true;
    options.readableObjectMode = true;
    super(options);
  }
  _transform(chunk, encoding, callback) {
    console.log('transform', typeof chunk);
    this.push(chunk);
    callback();
  }
  _flush(callback) {
    callback();
  }
}
// Dummy writer
class NullWriter extends Writable {
  constructor(options) {
    options = options || {};
    options.objectMode = true;
    // options.highWaterMark = 1;
    super(options);
    this._storage = [];
  }

  _write(chunk, encoding, callback) {
    console.log('write', chunk, typeof chunk, chunk.length);
    callback();
  }
}

// Infinite stream of random numbers
const readStream = _({
  next() {
    console.log('next!');
    return {
      done: false,
      value: Math.random(),
    };
  }});
const transform = new NullTransform();
const writer = new NullWriter();

This will cause the application to exit:

readStream
  .pipe(_().batch(1000))
  .pipe(writer);

Strangely, adding the dummy transform before the batching causes it to flow properly:

readStream
  .pipe(transform)
  .pipe(_().batch(1000))
  .pipe(writer);

Am I missing something?

vqvu commented 7 years ago

At first glance, it looks like the problem is in pipe(_().batch(1000)). The only Highland stream that you can pipe to is the one that is returned by _() or _.pipeline(...). When you call batch, it returns a new stream that is read-only, so you can't pipe to that stream.

It's pretty rare that you'll need to use _() at all. In this case, since readStream is already a Highland stream, you can call batch on it directly.

readStream
  .batch(1000)
  .pipe(writer);

If readStream is a Node stream (since you mentioned HTTP streaming), you can create a Highland stream of out it using the Highland stream constructor.

_(readStream)
  .batch(1000)
  .pipe(writer);

I don't know why the code you showed works when you add the dummy transform, but it only works by coincidence.