caolan / highland

High-level streams library for Node.js and the browser
https://caolan.github.io/highland
Apache License 2.0
3.43k stars 147 forks source link

Dynamic piping #316

Open kharandziuk opened 9 years ago

kharandziuk commented 9 years ago

Is it possible to resolve the problem below with highland.js?

Consider a problem:

For example, if I have a file which weights 4gb and split size is equal 1gb. The result is four files weights 1gb.

LewisJEllis commented 9 years ago

Without any additional information, this is a little tricky, but there are reasonable ways to do it using only core highland functions if you either bend the rules a little bit or if you know a little more about the input. Assuming neither of those is the case, your best bet would be to handle the file rotation using either a consume handler or a custom writable stream.

edit: looks like this is an implementation of the special writable stream I mentioned; using it as your pipe target would do what you're looking for.

kharandziuk commented 9 years ago

Hi, @LewisJEllis , I believe I found a solution http://stackoverflow.com/questions/30591188/dynamic-piping-with-frp without any additional libraries.

vqvu commented 9 years ago

Here's a general corecursive implementation using consume that doesn't require you to create a new fs stream for every buffer. It relies on the guarantee that calling next inside the consume handler never causes a reentry of the handler.

function splitStream(maxSize) {
    return function (s) {
        var i = 0;
        var size = 0;

        var err = null;
        var buf = null;
        var yieldToChild = null;
        var yieldToParent = null;

        return s.consume(function (_err, _buf, push, next) {
            yieldToParent = null;

            if (_buf === _.nil) {
                push(null, _.nil);
            } else {
                yieldToParent = next;
            }

            err = _err;
            buf = _buf;

            if (yieldToChild) {
                yieldToChild();
            } else if (_buf !== _.nil) {
                size = 0;
                push(null, newChild());
            }
        });

        function newChild() {
            return _(function (push, next) {
                yieldToChild = null;

                push(err, buf);

                if (err) {
                    yieldToChild = next;
                } else if (buf !== _.nil) {
                    size += buf.length;
                    if (size >= maxSize) {
                        push(null, _.nil);
                    } else {
                        yieldToChild = next;
                    }
                }

                err = null;
                buf = null;

                if (yieldToParent) {
                    yieldToParent();
                }
            });
        }
    };
}

To use it

_(fs.createReadStream('large.txt'))
    .through(splitStream(1024*1024))
    .scan({i: -1, stream: null}, function (acc, stream) {
        return {
            i: acc.i + 1,
            stream: stream,
        };
    })
    .drop(1)
    .each(function (s) {
        s.stream.pipe(fs.createWriteStream('small_' + s.i + '.txt'));
    });

You should get a bunch of files numbered from 0 that are 1MB each.

LewisJEllis commented 9 years ago

I was scribbling up a consume-based solution, but I wasn't managing to use .pipe and was thus having to roll my own back pressure handling, which is always annoying. I like @vqvu's solution, so I'll just leave a couple comments:

@kharandziuk while that works, I personally think it's a bit craftier than it needs to be, and with all the extra state being passed around, there's probably a measurable amount of overhead. Also, by using wrapCallback with fs.WriteStream.write, you're ignoring the destination's highWaterMark and just waiting for each chunk to be flushed before you read any further. I'm not sure if that causes a measurable slowdown, but it certainly doesn't help.

@vqvu at first glance I also suspected that @kharandziuk's solution was creating a new fs stream for every buffer, but it isn't; nextStream() is only invoked when acc.size > MAX_SIZE.

Finally, just for contrast, doing this with logrotate-stream is two lines, avoids reinventing the wheel, performs roughly the same as far as I can surmise, and doesn't even need Highland:

var fs = require('fs');
var logrotate = require('logrotate-stream');

// would use fs.stat to do math to compute 650 instead of hardcoding but you get the idea
var rotator = logrotate({ file: './out.txt', size: '10m', keep: 650 });

fs.createReadStream('walmart.dump').pipe(rotator);
vqvu commented 9 years ago

@LewisJEllis Oh, you're right. Oops.

@kharandziuk I pretty much agree with @LewisJEllis. Just use logrotate unless what you want to do isn't write to a bunch of files. Also, now that I look at it more closely, I don't think your solution actually works most of the time.

You overwrite acc.stream and acc.buffer in the scan handler and use parallel(4). Unless the fs write stream is really fast, you're probably writing four copies of every fourth buffer and dropping the rest on the floor. Not to mention if your buffers are really large (> .25 * MAX_SIZE), you'll end up skipping whole files. You really should be creating new objects every time in scan. You can also skip the parallel and just use flatMap, but that's no better than a synchronous write to disk if you're waiting for the writes to complete before doing more work.

Finally, I assume the following is a typo? In your code, parallel and flatMap both consume from the same stream, s, which is not allowed.

s.parallel 4
s.flatMap((x) ->
  debug('flatMap')(x)
  x.stream(x.buffer)
)
.done -> debug('profile')('finish')
kharandziuk commented 9 years ago

@vqvu No, it isn't a typo. It works and it looks like there is no other way to make it works. Consider two samples below

H= require('highland')

readS = H([1...10])

s = readS.map(
  H.add(2)
)
s.parallel(2) # It works
s.each((el)->
  console.log(el)
)

and

H = require('highland')

readS = H([1...10*3])

s = readS.map(
  H.add(2)
)
s.parallel(2).each((el)-> # it doesn't work
  console.log(el)
)

An error in second case TypeError: undefined is not a function

vqvu commented 9 years ago

parallel only works on streams of streams, so the second case not working is the correct behavior (though we should probably throw a more descriptive error in this case). The following code will not work as expected.

var s = H([1, 2, 3, 4]).map(H);
s.parallel(2);
s.each(H.log);
// => A bunch of streams instead of [1, 2, 3, 4].

The first case "works" because Highland is lazy and parallel will not consume from s until you want data from it, which you never do because you immediately throw it away. So the parallel ends up doing nothing. This won't work and will throw an error.

var s = H([1, 2, 3, 4]);
s.map(H.add(2));
s.each(H.log);

It is undefined behavior to consume from a stream multiple times without choosing a back-pressure strategy (fork or observe).

Edit: What you probably wanted is a map followed by a parallel (instead of separate parallel and flatMap).

s.map(function (x) {
    debug('map')(x)
    return x.stream(x.buffer);
})
.parallel(4)
.done(function () {
    debug('profile')('finish');
});