caolan / highland

High-level streams library for Node.js and the browser
https://caolan.github.io/highland
Apache License 2.0
3.43k stars 147 forks source link

Transducers #212

Closed apaleslimghost closed 9 years ago

apaleslimghost commented 9 years ago

As you probably know, I'm hugely in favour of reducing the surface area of this library. In particular, many of our transforms actually have nothing to do with streaming. Implementing an interface for transducers would reduce a lot of the kitchensinkiness in this library.

https://github.com/jlongster/transducers.js https://github.com/cognitect-labs/transducers-js http://jlongster.com/Transducers.js--A-JavaScript-Library-for-Transformation-of-Data

Thoughts?

vqvu commented 9 years ago

This looks cool. Initial thought is that I'm +1 for this.

On Thu, Jan 29, 2015 at 3:17 AM, Matt Brennan notifications@github.com wrote:

As you probably know, I'm hugely in favour of reducing the surface area of this library. In particular, many of our transforms actually have nothing to do with streaming. Implementing an interface for transducers would reduce a lot of the kitchensinkiness in this library.

https://github.com/jlongster/transducers.js

http://jlongster.com/Transducers.js--A-JavaScript-Library-for-Transformation-of-Data

Thoughts?

— Reply to this email directly or view it on GitHub https://github.com/caolan/highland/issues/212.

apaleslimghost commented 9 years ago

Only trouble is that transducers have their own mechanism for laziness. I'm not sure how it would work with push/consume.

jeromew commented 9 years ago

+1 for keeping a sane highland footprint.

yes it is very interesting to see how "transform + reduce" is sufficient to do everything they mention.

What is not clear to me in transducer.js is that the only async use case that is mentioned in the article is the one with js-csp so it is not clear to me how we can re-use transducer.js. Maybe I missed something.

you would like transduce to be able to work with an highland stream do I understand correctly ?

I am trying to see what that would mean in pseudo code trying to convert

_([1,2,3]).map(x => x + 1).map(x => x + 1).toArray()

to

function append(result, x) {
  result.write(x);
  return result;
}
var s1 = _()
transduce(map(x => x + 1), append, s1, _([1,2,3]));
into([], map(x => x + 1), s1);

or maybe you mean that we should be able to use transducers like they do in the js-csp example

var xform = compose(map(x => x * 2),
                filter(x => x > 5));
_([1,2,3]).transduce(xform).resume()

or maybe this is the same thing after all ;-)

vqvu commented 9 years ago

I imagine supporting this syntax.

var xform = compose(map(x => x * 2),
                filter(x => x > 5));
_([1,2,3]).transduce(xform).resume()
svozza commented 9 years ago

Yeah, I was looking into this a while back, I even got in touch with @kevinbeaty who's written underscore-transducer and he gave me some great tips on how to go about providing transducer support in Highland but I still haven't got round to acting on them.

You can actually use the Cognitect transducers with Highland right now with the toFn function detailed here and passing it into _.reduce.

var _ = require('highland');
var t = require('transducers.js');

var arr   = [0,1,2,3,4,5,6,7,8,9,10],
    apush = function(arr, x) { arr.push(x); return arr; },
    xf    = comp(map(inc), filter(isEven)),
    toFn  = t.toFn;

_(arr).reduce(toFn(xf, apush), []).toArray(console.log);

Or by wrapping the transducer using transduce-stream and using _.pipe:

var stream = require('transduce-stream');
_(arr).pipe(stream(xf)).toArray(console.log);

Unfortunately I didn't get any further than those toy examples.

apaleslimghost commented 9 years ago

transduce-stream would also work well with _.through, I'm guessing.

svozza commented 9 years ago

Yeah, I think I tried it that way too and they both worked.

kevinbeaty commented 9 years ago

I'm obviously biased, but I think this is a great idea.

@jeromew

you would like transduce to be able to work with an highland stream do I understand correctly ?

Actually, no. You would use transduce when you are working with collections that are iterable. The implementation of reduce assumes you will always have the next iteration value. When working with streams, you would want to initialize a transducer and manually call step as you receive new items.

For example, transduce-stream uses transduce/push/asyncCallback to do this, which supports asynchronous iteration of transducer transformations by converting to a Node.js style callback and signalling completion with a continuation. You could potentially define a different asynchronous context that may make more sense within Highland.

or maybe this is the same thing after all ;-)

You are using the same transducers, but using them in different contexts.

@svozza I was wondering if you ever got around to working on this :)

You can actually use the Cognitect transducers with Highland right now with the toFn function

This is true, but you have to be careful as some transducers (take, takeWhile, etc.) may return a reduced value to signal early termination which would not be supported by most existing reduce functions. It could be easily added, just something to be aware of.

svozza commented 9 years ago

I wonder could this be implemented with consume. Obviously not this naively but along the lines of:

Stream.prototype.transduce = function (xf) {
    var stepper = transformer(xf); 
                        ^^^
     //i've no idea how the transformer's init, step and result 
     //methods would be implemented
    var result = stepper.init();

    return this.consume(function (err, x, push, next) {
        if (err) {
            push(err);
            //how would errors work, end the stream and return the result
            //or try to carry on?
        }
        else if (x === nil || stepper.__transducers_reduced__) {
            push(null, stepper.result(result));
            push(null, nil);
        }
        else {
            result = stepper.step(result, x);
            next();
        }
    });
};
exposeMethod('transduce');

Or would it have to be lower level than that? Or even higher where you could cheat and use _.collect. :laughing:

vqvu commented 9 years ago

We definitely don't want to use collect, since we'd lose the lazy behavior.

Here's my try at an implementation

var highlandTform = {
    init: function () {  },
    result: function (push) {
        push(null, _.nil);
        return push;
    },
    step: function (push, input) {
        push(null, input);
        return push;
    }
};

Stream.prototype.transduce = function (xf) {
    var tform = xf(highlandTform);

    return this.consume(function (err, x, push, next) {
        if (err) {
            // Pass through errors, like we always do.
            push(err);
            next();
        }
        else if (x === nil) {
            tform.result(push);
        }
        else {
            var res = tform.step(push, x);
            if (res.__transducers_reduced__) {
                tform.result(res.value);
            }
            else {
                next();
            }
        }
    });
};
svozza commented 9 years ago

Tongue firmly in cheek with the _.collect suggestion. Very nice work!

vqvu commented 9 years ago

Haha, got it. I didn't notice the ":laughing:" at the end there. :sweat_smile:

svozza commented 9 years ago

Now to the real issue, are we going to have to wait until 3.0 before we can use this? :wink:

kevinbeaty commented 9 years ago

@vqvu Nice. I ended up "borrowing" a modification of this idea for transduce-stream to remove the dependency on transduce. Not sure why I didn't do something like this initially (if you have a hammer...)

One question, as I'm not that familiar with the Highland implementation: Does avoiding calling next in consume effectively destroy the stream? I noticed in the implementation that it will pause the stream. What happens if the stream is resumed? The initialized transformer is no longer in a state where it is safe to call either step or result after __transducers_reduced__.

vqvu commented 9 years ago

@kevinbeaty Not calling next doesn't do anything. It just means the callback won't be called again until you do. Moreover, it's not safe to call next after pushing nil.

What's happening here is that _.nil is always pushed after __transducers_reduced__. This ends the stream. Highland makes no guarantees about resuming an ended stream. You're just not supposed to do it.

@svozza I imagine we can have this as soon as someone writes the tests and docs for transduce :grinning:.

svozza commented 9 years ago

Ha. I'd be more than happy to help with the tests, not sure I've got a deep enough understanding to do the docs though.

kevinbeaty commented 9 years ago

@svozza A suggestion for the docs:

Stream.prototype.transduce(xf) Consumes values from a Stream (once resumed) and returns a new Stream transformed by the transducer, xf.

This function can form the basis of many higher-level Stream operations. It will not cause a paused stream to immediately resume, but behaves more like a 'through' stream, transforming values as they are read by the transducer.

The transducer is a function that accepts a transformer that follows the transducer-protocol defined by transducers-js and compatible libraries.

Parameters

var filter = function(f, source){
    return source.transduce(transducers.filter(f));
}
vqvu commented 9 years ago

@svozza See #223.