kriskowal / gtor

A General Theory of Reactivity
MIT License
3.03k stars 109 forks source link

How to create "pull streams"? #15

Open domenic opened 10 years ago

domenic commented 10 years ago

Let's say I wanted to create a readable file stream that only did I/O in response to being asked. I.e., it would be "lazy":

var stream = new Stream(/* ??? */);

// disk not touched

stream.next().then(iteration => {
  // disk was touched once to get this chunk
});

// or
stream.readInto(myArrayBuffer, 0, 1024).then(() => {
  // disk was touched once, reading 1024 bytes
});

How can I do that with this stream design? The setup function seems designed only for those who want to "push" data at the consumer.

kriskowal commented 10 years ago

This is related to the paper that @andywingo shared with us on ES-Discuss, How to add laziness to a strict language without even being odd. I need to read the paper carefully before using the "even" and "odd" terminology, but essentially you need to construct one value eagerly and call write first, then wait for it to be consumed before producing another. The desired behavior for a lazy stream is that you wait first, then produce a new value. One of these is even and the other is odd, or perhaps iambic and trochic.

kriskowal commented 10 years ago

Here’s a sketch that produces the described behavior.


var Iteration = require("../iteration");
var Stream = require("../stream");
var Promise = require("../promise");

function Producer(callback) {
    this.callback = callback;
}

Producer.prototype = Object.create(Stream.prototype);
Producer.prototype.constructor = Producer;

Producer.prototype.next = function () {
    console.log("NEXT");
    var self = this;
    return new Promise(function (resolve, reject) {
        self.callback(
            function write(value) {
                resolve(new Iteration(value, false));
            },
            function close(value) {
                resolve(new Iteration(value, true));
            },
            function abort(error) {
                reject(error);
            }
        );
    });
};

var start = Date.now();
var n = 0;
new Producer(function (write, close, abort) {
    if (n < 10) {
        write(n++);
    } else {
        close();
    }
})
.forEach(function (value) {
    console.log(Date.now() - start, value);
    return Promise.delay(1000);
}, null, 2)
kriskowal commented 10 years ago

Note that in this sketch, the producer does not deal with back pressure. None of the producer methods return promises. The producer starts a job for each request, where a stream would produce the value from the last job and start the next.

kriskowal commented 10 years ago

It occurs to me that I sketched Stream.from(iterable). This would make it possible to do, for example:

var n = 0;
Stream.from({
    next: function () {
        return new Iteration(n++);
    }
})

The sketch does not yet work, but it serves purpose and mirrors Array.from. However, the meter would still be eager, not lazy. Stream.from would call iterator.next immediately and stream.next would receive that result, as opposed to stream.next causing iterator.next to be called.