Closed Raynos closed 11 years ago
I approve of consume
.
Hmm, if you want this to appeal to a standards committee, you probably should expand the names into something more verbose, with camel case. read -> readNext, stop -> abortStream, consume -> readFrom
Of course, it will need a constructor type, and isSimpleStream
static method.
I'm actually being serious. Add extra stuff so that people can argue about it, and then vote to leave off the extras.
@dominictarr I think in recent times the W3C standards favor short names.
They probably will need a constructor though.
The problem with needing a constructor is it assumes a prototype and dependence on a proper "this" when calling. Also I much prefer defining a interface rather than a concrete type. I don't want to have to subclass Stream
to create a stream.
+1 for avoiding this
As far as always requiring that sinks be objects, I think that's unnecessary. A stream is very often a value you pass around and makes sense to be an object because it has two very distinct channels (data and close). But a sink it usually either a standalone helper function in some library or a part of some other object. The only thing I want to standardize is the property name for the sink in the case where it's part of a duplex stream object. I think consume
is fine. I chose sink
to be consistent with min-streams and also because it's four letters just like read
and stop
.
@creationix
var writer = window.FileWriter(location1)
var stream = window.FileStream(location2)
writer.consume(stream)
I have a feeling that the W3C may like that sort of interface more. It also means you can do duck type checks to see whether something is a sink.
The example is while (part = yield stream.read)
which feels weird.
Having while (part = yield stream.read())
would be nicer but would murder the API for generators convenience.
Yeah, the fact that read is a continuable is just a coincidence, not an API goal.
We should specify the allowed states. A stream is basically zero or more values followed by an end (an end which is either natural or caused by an error)
We should specify that once the callback fires any call to .read()
will not return any more values. Optionally specify that when you call .stop()
and even before the callback fires it should not return any values to the read callback. Basically it would be useful to spec out the relationship between stop()
and read()
Also I don't think the callback should be optional.
fileStream.stop(null, function (err) {
if (err) { /* disk close error */ }
})
stop()
may cause some kind of error when the stream tries to close itself, when that happens it should inform someone.
I have called a pullTransform
a duplex
function before. and I would do duplex TCP like
// tcp.createServer := (Number, (Stream) => Stream)
tcp.createServer(8080, function (socket) {
return socket
});
A more interesting one would be
// tcp.createServer := (Number, (Stream) => Stream)
tcp.createServer(8080, function (socket) {
return jsonSerialize(app(jsonParse(socket)))
});
@Raynos I really like the tcp accepting a transformer instead of just providing a duplex object and ignoring the return value!
I'm fine with stop
requiring the callback if you feel strongly about it. It will make the API a little harder to use, but considering it's not a hot path for most programs, that should be fine.
We should specify the allowed states. A stream is basically zero or more values followed by an end (an end which is either natural or caused by an error)
Yes
We should specify that once the callback fires any call to
.read()
will not return any more values. Optionally specify that when you call.stop()
and even before the callback fires it should not return any values to the read callback. Basically it would be useful to spec out the relationship betweenstop()
andread()
This gets tricky in larger chains. The source may be done sending events and indeed never send anything after calling the callback to .stop()
, but the layers after it may still have data pending in the pipeline that eventually comes out. The stop
channel is very fast and often will be a direct reference to the stop function in the source. The spec should then say:
The source will never output data events after a stop request has been received, but be aware that other layers downstream may still contain data.
Though, I guess since technically every layer exports a new "source" interface, it would need to observe the same rule for stop and clear it's data queue when stop is called. This makes chaining stop much harder, hmmmm.
module.exports = function (source) {
// ... code including definition of dataQueue
return { read: read, stop: stop };
function stop(err, callback) {
dataQueue.length = 0;
source.stop(err, callback);
}
function read(callback) {
// do stuff
}
}
Even truncating dataQueue
, there can still be pending read calls to the parent source and a flag would need to be set that tells onRead to ignore the value when it finally returns (or call onRead directly or something otherwise crazy)
oh, I just realized that this api is nearly identical to the iterator api used in https://github.com/rvagg/node-leveldown see also https://github.com/dominictarr/async-iterator
we originally adopted this because it would be easy to wrap into either streams1, or stream2 and modeled the underlying leveldb api pretty closely.
we chose next
and end
, but same idea otherwise.
regards a consume
method, I think consistency trumps necessity.
There are places where you don't care whether something is a complete sink, or just one side of a transform. Also, if a sink is an object, then you can explain sources, explain sinks, and then explain a transform as both a source and a sink.
I started using the word "sink" to designate a stream that has no readable side - this reflects the usage of the word 'sink' in graph theory http://en.wikipedia.org/wiki/Sink_(disambiguation) . having a method "sink" on a transform stream breaks this.
@dominictarr oh cool. Do you think it would be confusing to use the same names, but have slightly different semantics. I see leveldown doesn't accept a reason (error) when ending and my data events won't have key and value, just value. Otherwise it seems very close.
I think we could get away with not having err
in stop/end/close if we slightly changed how error propagation worked. In the new system, end
would be used only to notify upstream that we won't be consuming anymore. We could send an appropriate error event downstream at the same time from wherever the error started. Upstream would never know about the error, but I think that's fine. It just needs to know it can safely clean up resources.
hmm, yes I think that leveldown could be made compatible with this proposal. the difference is very slight.
@dominictarr
regards a consume method, I think consistency trumps necessity. There are places where you don't care whether something is a complete sink, or just one side of a transform. Also, if a sink is an object, then you can explain sources, explain sinks, and then explain a transform as both a source and a sink.
I started using the word "sink" to designate a stream that has no readable side - this reflects the usage of the word 'sink' in graph theory http://en.wikipedia.org/wiki/Sink_(disambiguation) . having a method "sink" on a transform stream breaks this.
Having a transform that is an object with both readable and writable ends is interesting. (also much closer to how node works with readable and writable streams)
var source = fs.readStream("input.txt");
var sink = fs.writeStream("output.txt");
function transform() {
return {
next: function (callback) { ... },
end: function (callback) { ... },
consume: function (stream) { ... }
}
}
// Very nice chaining API though
sink.consume(transform()).consume(source);
But this has issues. next
and end
have no meaning and nothing to pull from until consume
is called setting up their data source. To enable chaining, consume
could return the thing it just consumed.
The sink will start pulling from the transform before it has a chance to connect to it's source. This would mean all next
functions would need an extra state to handle this early call and defer calling the callback. The nice syntax may or may not be worth this cost. (keep in mind most transforms would already have internal queues, flags and checks. Adding one more isn't too bad in most cases)
var source = fs.readStream("input.txt");
var sink = fs.writeStream("output.txt");
function transform(source) {
return {
next: function (callback) { ... },
end: function (callback) { ... }
};
}
// Very simple consuming API
// note that I'm still expressing the sink as an object even though the transform is just a function.
sink.consume(transform(source));
If a transform was modeled as a function that accepted a source and returned a new source, it would be more straightforward.
Now duplex streams (as opposed to transform filters) are modeled great as a single object with {next, end, consume}
and app logic where symmetry is needed is easy to model.
var jsonCodec = require('json-codec');
var lineCodec = require('line-codec');
tcp.createServer(8080, function (socket) {
// socket is a "duplex" stream with {next, end, consume}
// Apply protocol de-framing and framing on the duplex stream
socket = lineCodec(socket);
// Apply JSON parsing and Serialization to the duplex stream
socket = jsonCodec(socket);
// App is a simple echo server, so echo objects back
socket.consume(socket);
// or written as a single expression
socket.consume(jsonCodec(lineCodec(socket))
});
But if we make the TCP library act like a filter itself, then socket is no longer duplex.
var json = require('json-codec');
var line = require('line-codec');
tcp.createServer(8080, function (socket) {
// Written as a sequence of actions
socket = line.deframe(socket);
socket = json.decode(socket);
socket = app(socket);
socket = json.encode(socket);
socket = line.frame(socket);
return socket;
// Or written as one expression:
return line.frame(json.encode(app(json.decode(line.deframe(socket)))));
});
function app(stream) {
// Just an echo server
return stream;
}
I'm fine with stop requiring the callback if you feel strongly about it.
I don't mind too much, I think it would make things simpler.
I think we could get away with not having err in stop/end/close if we slightly changed how error propagation worked
That could work nicely, all upstream would do with it is echo it back anyway so that's probably better.
I am wary of
function transform() {
return {
next: function (callback) { ... },
end: function (callback) { ... },
consume: function (stream) { ... }
}
}
That is a function that takes no arguments and returns two things as a result. I am personally going to avoid this and focus on fn(input) -> output
instead of fn() -> { input, output }
We can write a simple series
helper like @creationix series
function and @dominictarr continuable-series
module to make working with a chain of duplex function stream things easier.
I am +1 for this as it allows us to model implementation details as functions and embed a helper flow control function in the app code without relying on a simple-stream flow control library.
It also means the implementation of "json-code"
probably has no dependency as it's a simple transform function
var json = require('json-codec');
var line = require('line-codec');
tcp.createServer(8080, function (socket) {
return series(
json.decode,
line.deframe,
app,
line.frame,
json.encode
)(socket)
});
function app(stream) {
// Just an echo server
return stream;
}
function series() {
var args = [].slice.call(arguments)
return function duplex(stream) {
for (var i = 0; i < args.length; i++) {
stream = args[i](stream)
}
return stream
}
}
@raynos, so then looks like you vote for modeling transforms as function (stream) -> stream
and letting external libraries make it pretty (basically what I've been doing for min-streams all along)
yes!
Ok, so recap with the latest API proposal:
var stream = {
next: function (callback) { ... },
end: function (callback) { ... }
};
function transform(stream) {
// ...
return { next: next, end: end };
function next(callback) { ... }
function end(callback) { ... }
}
var duplexStream = {
next: ...
end: ...
consume: ...
};
A transform function could be a duplex transform and accept a duplex stream and return a new duplex stream. Though it's probably better to write transforms as separate encode and decode. A generic duplex transform that accepted encode and decode would be easy to write.
function duplex(decode, encode) {
return function (original) {
var transformed = decode(original);
transformed.consume = function (source) {
original.consume(encode(source));
};
return transformed;
};
}
Usage of duplex
above would be:
// Create a duplex stream
var socket = tcp.connect(1337);
// The protocol is framed, let's remove that layer.
socket = duplex(line.deframe, line.frame)(socket);
// The protocol is also JSON encoded
socket = duplex(json.decode, json.encode)(socket);
Hmm, I just realized this duplex
helper is just a weaker version of the series
helper from above from the point of view of the consumer. It works differently inside because it's working with duplex streams and not simple streams though.
@creationix I think the symmetrical sugar is weak enough that something like
function serialize(stream) {
return series(line.deframe, json.decode, stream, line.frame, json.encode)
}
Would cover most use cases.
I have yet to really understand the motivation for that one. I find function (stream) -> stream
simple enough that I don't know when to use a push transform instead.
var sink = {
consume: function (stream, callback) { }
}
if a sink
took a callback it could call it with an error if it failed to consume the source. It can also call it without an error to signal that it finished consuming the source.
I took some time to put together a little playground to test simple-streams out. I have yet to test out the duplex pattern.
Some thoughts:
.end
on its input stream to allow for cleanup? or is stream.end()
more exceptional than that?end
, then forward the end
. is this correct?through
, but impressively, not much.@chrisdickinson I would assume that if a stream ends with or without an error then you should NOT call .end()
/ .stop()
/ .abort()
on said stream.
Also you can do filter
way simpler
function filter(lambda) {
return function duplex(stream) {
return { next: next, end: stream.end }
function next(callback) {
stream.next(function onread(err, value) {
if (value === undefined) {
return callback(err)
}
var keep = lambda(value)
if (keep) {
return callback(null, value)
}
stream.next(onread)
})
}
}
}
@creationix with min streams I believe calling next()
and end()
concurrently was invalid. Because both next()
and end()
were the same function.
Maybe we should dis allow the consumer of a stream to call those two functions concurrently and force them to wait for a result from next()
before they can call end()
sinks need more of a spec. in particular, I didn't know what how to communicate completion/errors out to client, non-stream code. I'm +1 on @Raynos's suggestion.
In min-streams, sinks return continuables for exactly this reason. It's worked out great so far in my js-git code. The continuable will resolve with the end/error event in the stream.
it feels much easier to get right than min-streams.
That's the goal.
is it the responsibility of the sink to call .end on its input stream to allow for cleanup? or is stream.end() more exceptional than that?
No, you only call end
if you want to end the stream early. Perhaps calling is end
is confusing with the end
in node's writable stream interface?
what do we do with extra read callbacks? the dom example has a filter stream module that collects a list of callbacks. right now I simply truncate that buffer on end, then forward the end. is this correct? there's more boilerplate than using through, but impressively, not much.
After sleeping on it, I think it's best to say that calling .end()
doesn't guarantee the data stream will stop right away. That adds too much boilerplate and extra code to each and every layer for little gain. Also I don't think the source needs to insert an end event into the stream when .end()
is called. The "end" event in the stream is for natural ends. If a filter in the middle wants to cleanup something it needs to listen for both "end" events and the callback to ".end(callback)" since either could end the stream (and both may happen sometimes).
As far as truncating callbacks, just make sure to always eventually call every callback. It really messes up programs when callbacks never get called.
btw I would vote for { read, abort }
as more intuitive then { next, end }
Also "just make sure to always eventually call every callback." answers a lot of questions about read
and abort
interaction.
Yeah. I was thinking that cancel
or abort
would be more descriptive than end
. +1 on a name change.
On Jul 2, 2013, at 8:33 AM, Raynos notifications@github.com wrote:
btw I would vote for { read, abort } as more intuitive then { next, end }
Also "just make sure to always eventually call every callback." answers a lot of questions about read and abort interaction.
— Reply to this email directly or view it on GitHub.
I think there needs to be very little interaction between read
and abort
. If you try to read from a source that's been stopped, it will simply emit end. So I take back what I said about pieces in the middle needing to intercept the abort call. Everyone can just keep reading from their source till value is undefined
and then know the stream is done.
var stream = {
read: function (callback) { /* callback(err, value) */ },
abort: function (callback) { /* callback(err) */ }
};
function transform(source) {
return { read: read, abort: source.abort };
function read(callback) {
source(function (err, value) {
if (value === undefined) return callback(err);
callback(null, value.toUpperCase());
});
}
}
Also I think I want to model tcp streams as stream transforms instead of duplex streams. Adding in duplex complicates the model a lot.
// Echo server
tcp.createServer(8080, function (socket) {
return socket;
});
// Echo client
tcp.connect(8080, function (socket) {
return socket;
});
As to @chrisdickinson's question about the cases where you really need a writable interface (where normal imperative logic is better than a state-machine transformer):
tcp.createServer(8080, function (input) {
var output = writableSource();
// output has both readable and writable interfaces:
// { read(callback), abort(callback), write(value, callback), end(err, callback) }
// or { read(callback), abort(callback), emit(err, value, callback) }
// The exact interface for writable doesn't matter because it's not part of the simple-stream spec.
return output;
});
Though you usually also want your input to be pushed to you in the app case, so the push-filter interface is best here I think.
tcp.createServer(8080, pushToPull(function (emit) {
// Call emit(err, item) every time we want to write data outwards.
return function (err, item) {
// Called every time data is written to us
};
}));
What about if a transform stream is still a {consume}
but it just returns it self. sure there is a thing here where someone might read from it, but it doesn't have an input yet, but that can be abstracted, and is even useful in some cases.
At least, as the stated benefit of objects is that you get structural typing, making other streams objects, but transforms not objects is not consistent. Conceptually, they are still streams.
I don't see transforms as streams. That's just a pattern I've seen done in node where there are duplex streams. Transforms can be modeled as duplex streams, but I don't feel it properly represents them.
To me, having only streams be objects is quite symmetrical and clean. The only basic type here is the stream. It's the only thing in the spec that everything else has to agree on. Things like sources, filters (transforms), and sinks are simply functions that either accept or return streams. Push-filters are just an easy way to create normal stream consuming filters and are really not part of the spec.
So to further explain, let's take some known APIs from the non-streaming world. A string is like a stream, but instead of through time, it's a stream of bytes through memory, but all seen in an instant. If I wanted to take a string of JSON and transform it into the object that it represents, I don't use an object to do that conversion, I use a function.
var obj = JSON.parse(json);
Likewise if I have a stream of raw json strings through time and want a new stream of parsed objects through time, I would do the same thing.
var objStream = json.decode(jsonStream);
The signature of JSON.parse
is (jsonString) -> object
and the signature of json.decode
is (stream<jsonString>) -> stream<object>
.
right, I see, so the only difference between this design and pull/min is that it's two separate functions, rather than one that combines them.
@dominictarr basically yes, and I changed the error handling slightly so that you don't send a reason when aborting the stream and you don't wait for the source to reflect the reason, but send your own error downstream directly.
Ok, I've updated the official spec to reflect these changes. It's looking real clean. https://github.com/creationix/js-git/blob/master/specs/simple-stream.md
@dominictarr also, since sink isn't part of the stream spec, you're free to implement your transforms/filters as objects if you want. I can still interop as long as we all use the same interface for the streams themselves. As for me personally, I much prefer filters being functions that accept and return streams. I aim to write all js-git related filters as functions.
so, I can think of a few situations where the reason might be important. example: on tcp you want to know if the stream failed because there was no server, or if it dropped the connection, or it timed out.
if you where writing to a file, you want to know if the error was that you didn't have permissions, or if you ran out of diskspace. hmm, although I guess you could just use the continuable to get the error type...
@creationix
// Echo client
tcp.connect(8080, function (socket) {
return socket;
});
THAT. THAT A MILLION TIMES. client and server just became the exact same code. This is amazing.
The only basic type here is the stream. It's the only thing in the spec that everything else has to agree on.
I have found the difference between standardizing on promise
and continuable
to be about standardize the types and not the functions. With streams we can all agree what Readable
and Writable
is and then we can go do our own things with transform functions, monads or duplex stream objects.
sink = function that accepts a stream (and returns a continuable)
It might make sense for sink
to be a function that returns an object with a consume
method for purposes of structural typing. That consume method then accepts a stream and returns a continuable
See full spec here
Sinks as object
I think having
{ consume }
or some other name would be nicecc @dominictarr