nodejs / node

Node.js JavaScript runtime ✨🐢🚀✨
https://nodejs.org
Other
106.86k stars 29.16k forks source link

Streams pain points #89

Closed chrisdickinson closed 9 years ago

chrisdickinson commented 9 years ago

This is a place to enumerate pain points / problems with streams as they exist presently. Please focus on problems, clarifying comments and questions, or statements of commiseration for specific problems. Proposed solutions should be posted as separate issues that link back to this issue. Proposed solutions should have enough example / backing code to have an informed discussion about the tradeoffs, but are not expected to be production ready / fully working.

If the pain point is "rip out streams from core" -- that can be discussed over here. :)

I'll start:

tracker1 commented 9 years ago

1 - bringing in the functionality of event-stream into core streams would be really nice (which includes through2 and the like).

2 - being able to easily create a generic stream that you can write to, that passes through would also be nice.

3 - a first class error method (Stream.prototype.error) method would be helpful for binding/passing, vs stream.emit.bind(stream, 'error')

4 - having methods bound to the context internally would be a bonus as well... stream.write.bind(stream) vs simply passing stream.write;


The following psuedo-example illustrates what I mean here, and something that would be closer to what I'd like to see.

mylib.foo = function getDataRowStream(query, params) {
  var stream = Stream.create(true /*object mode*/);  // 2 above, simple create of through stream
  queryDataAndPipeToStream(query, params, stream);
  return stream;
}

function queryDataAndPipeToStream(query, params, stream) {
  getDatabaseConnection()
    .then(function(conn){
      var req = new dal.Request(conn);
      req.stream = true;
      req.on('row', stream.write /*methods bound to stream instance*/)
      req.on('error', stream.error /*emits 'error' event*/)
      req.on('complete', stream.end);
      req.query(query, params)
    })
    .catch(stream.error);
}
chrisdickinson commented 9 years ago

(EDIT: this was originally quoting the comment above, which was since edited)

I think the disconnect for me, is going from something like creating a connection to a database (where a promise is a great fit) to something where a stream makes sense (returning rows of data) is a bit weird.

I might be misinterpreting this quote, so forgive me if I'm just blathering here:

Promises are great for representing single async operations that happen exactly once, pass or fail -- they are a chainable primitive for those sorts of operations. Node's primitive for that class of operation is the Node-style callback -- which is, notably, not inherently chainable. Streams (collectively including readables, writables, duplexes, and transforms) represent a series of zero or more data events over time, that may eventually end or fail.

chrisdickinson commented 9 years ago

Some more pain points:

  1. There are two .pipe functions -- Stream.prototype.pipe and ReadableStream.prototype.pipe. This is confusing for end-users and causes bugs.
  2. Streams are based on event emitters, but listening to certain events (readable and data) has implicit side effects.
  3. Streams documentation is currently a mix of reference, implementation guide, usage guide, and advanced usage -- it's hard to follow and link into from other docs, or consume as a single document.
aredridel commented 9 years ago

Those last bits are SO TRUE to my experience.

tracker1 commented 9 years ago

@chrisdickinson I removed the part you quoted... I was merely expressing a thought in my head that it would be nice if there was a cleaner way to chain against a Promise that resolved to a Stream. It's not spec, but if a Promise was also an EventEmitter that would emit against the resolved item, it could double as a ReadableStream...


Another minor pain point is that Writable streams don't emit end they emit finish ... which means when my final chain goes from a through stream to a writeable io stream, my end/finish event handler needs to change. It would be nice if writable streams emitted end as well as finish after the final flushed output.

jonathanong commented 9 years ago

a lot of my grievances are already issues in node: https://github.com/joyent/node/issues/created_by/jonathanong

a major issue for web framework maintainers are leaks: https://github.com/jshttp/on-finished#example. requiring us to use https://github.com/stream-utils/destroy and https://github.com/jshttp/on-finished is a symptom of a broken stream implementation, imo.

but personally, i'd rather have readable-stream be separated from core, have all the stream development progress be moved into the WHATWG Stream specification, and allow users to use that as the stream implementation (assuming it fixes these bugs).

mjackson commented 9 years ago

There's no spec for streams

This is definitely the main pain point for me. That's it. If there were a formal streams spec that everyone could run their own little implementations on to test for conformance that would fix a LOT of issues.

@chrisdickinson I'm sure you've seen https://github.com/promises-aplus/promises-tests. I'm tempted to do something like that for streams. Think it would help us all get on the same page?

A first pass could just spec out the existing implementation in node, trying to catch as many corner cases as possible. Then, anyone who wanted could make a competing streams implementation, similar to how when.js, rsvp, bluebird, etc. compete on speed and features.

chrisdickinson commented 9 years ago

re: @mjackson:

This is definitely the main pain point for me. That's it. If there were a formal streams spec that everyone could run their own little implementations on to test for conformance that would fix a LOT of issues.

The other side of this is: if we spec streams as they exist currently, the difficulty of changing/improving streams goes up. Are we comfortable with how streams currently work, and are we willing to support that for the long haul? I agree that the eventual goal of streams should be a spec, but on the other hand, I feel like committing to what we have now may be premature.

EDIT: That said, I'd totally welcome a stream spec suite so we can see clearly what behaviors we're dealing with!


re: @jonathanong:

but personally, i'd rather have readable-stream be separated from core, have all the stream development progress be moved into the WHATWG Stream specification, and allow users to use that as the stream implementation (assuming it fixes these bugs).

The first part of that is under debate over here; I've made a big list of concerns with that approach as well. Re: the second half, the problem with using the WHATWG spec is that it's built around a core primitive that Node doesn't use -- namely, Promises.

Going through your linked issues, it seems like the lion's share of problems have to do with resource or handle-backed streams and how they interact (or, rather, don't interact as expected) with the pipe mechanism -- I totally agree that that behavior needs to be shored up; maybe as something that builds on top of vanilla streams, rather than baked into the base Readable and Writable classes.

jmar777 commented 9 years ago

Copying these over from https://github.com/iojs/roadmap/issues/1:

1) The lack of error-forwarding via pipe() removes much of its elegance. The docs like to make things look simple, as in this example:

var r = fs.createReadStream('file.txt');
var z = zlib.createGzip();
var w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

Beautiful, except you can't just add a single error handler on the end... you need one for each stream. And you could actually get errors from all three of them, so you can't just forward them to a callback or anything sane like that... it just gets gnarly way too fast.

2) This is much broader, but the whole readable/writable dichotomy is confusing in a way that spiders out into virtually all stream implementations too. I think there are enough examples out there of "read only" stream APIs that make a compelling enough case for that (I even think @creationix experimented with wrapping node streams like that at one point).

3) Object mode. I sort of get why they ended up in there, but if there's a specific point in time where streams jumped the shark...

I don't think any of those are actionable so long as node compatibility is a requirement, but I guess that's a different topic.

gaearon commented 9 years ago

Coming from Rx background, I was unpleasantly surprised by how unintuitive piping is. Most examples of downloading a file (arguably the simplest use case for streams and piping) that I found it blogs and on StackOverflow would get error handling, or flushing, or finish/close/disposal sequence wrong. Seriously. (Oh noes.)

IMO Node's pipe is somewhat like jQuery's “promise”-mutating then in this respect: broken by design due to absence of ownership and reader/writer separation in the API. In contrast, Rx Observables separate consuming from creating, like real Promises do, so there is no room for the whole class of mistakes.

Rx also has first-class disposables, as in you have to return a “disposable” from Observable constructor, and there are several built-in utility disposables for simpler composition.

I can be wrong though! Pardon me if I'm missing something obvious about how streams should be implemented.

sonewman commented 9 years ago

I think the problem with error forwarding is that if you had a long stream of piping object, it would be hard to determine where the error actually resides and deal with it in an appropriate manor. As far as i know streams, (though similar in nature to a promise will unlimited resolves of data , before a final end resolve - to squish into a promise paradigm) the idea is more about each individual stream taking ownership of its responsibilities, that actually gives the implementer much more control, albeit extra code and edge-cases which they themselves are required to take responsibility of dealing with.

sonewman commented 9 years ago

Also i get the point about the end event on the readable and the finish event on a writable, but what about transform streams? The finish event gets called when .end is called on the writable as it should, even if a stream consuming the readable end has not finished reading it. Only once this is done will it emit the end event. I think it is important to differentiate between the two events even though it might see a little confusing semantically.

joepie91 commented 9 years ago

From an application developer point of view, these are my main concerns with streams2:

  1. There is a pipe event on Writable streams, but not on Readable streams. It can be useful to be notified when something starts being piped elsewhere, and it's not obvious how to be made aware of this. Overriding _read or pipe methods is a hack at best, especially with third-party streams.
  2. Apparently it's only possible to set objectMode true for either both input and output, or neither. To set either of the two to object mode, but not the other, you need a custom constructor. This should really be easier to do.

Aside from those two points, I haven't really found any issues with streams so far, from my point of view. I'm actually finding them to be rather practical to work with.

sonewman commented 9 years ago

Having a pipe event on a duplex/ transform/ readable & writable stream, would be confusing. How would you know if it was piping or being piped if you had the same event?

Overriding the pipe method is not too difficult, if you absolutely needed that:

var pipe = Readable.prototype.pipe
Readable.prototype.pipe = function (dest, options) {
  this.desintations.push(dest)

  dest.on('finish', function () {
    var i = this.destinations.indexOf(dest)
    ~i && this.desintations.splice(i, 1)
  }.bind(this)

  return pipe.call(this, dest, options)
}

With regards to question 2, if you needed to pipe a transform object stream into a buffer there is no reason you can't do this in your transform:

var Transform = require('stream').Transform
var inherits = require('util').inherits

function ObjToBufferStream() {
  Transform.call(this, { objectMode: true })
}
inherits(ObjToBufferStream, Transform)

ObjToBufferStream.prototype._transform = function (data, enc, next) {
  try {
     next(null, new Buffer(JSON.stringify(data)))
  } catch (err) {
     next(err)
  }
}

var objToBufferStream = new ObjToBufferStream()
objToBufferStream.pipe(process.stdout)

objToBufferStream.end({ obj: true })
sonewman commented 9 years ago

@chrisdickinson I am curious, after hearing the discussion in the TC about potentially moving streams development out of core and introducing it as a bundled dependency.

What sort of API would you give streams, if you could start from scratch? Would you go for a promised backed approach such as @domenic's https://github.com/whatwg/streams approach?

aredridel commented 9 years ago

You can in fact set objectMode for readable vs writable, but it's ugly: this._readableState.objectMode = true

sonewman commented 9 years ago

@aredridel this is true, albeit the opposite to my example.

But this would only really matter for a duplex stream, in any case you are still going to need to implement the internals _read() method and use of push() to accommodate for the format of your data.

If you were implementing a transform stream, and it was just set to objectMode, that doesn't mean you can't write buffers to it. I think determining objectMode for one side or another is a non-issue.

No matter what is implemented, the hardest part is the conversion between formats (e.g. buffers to objects), as data across the wire is not always deterministic and could very well run over the boundaries of a chunk.

This is an issue no matter what encoding you have, unless you are using only JavaScript objects directly (and then you can't really manage memory effectively).

My example is contrived but the only important thing is the part:

next(null, JSON.stringify(data))

all the rest is purely contextual.

Personally when I see properties like _readableState, _writableState, _transformSate I consider it as implementation detail and not API, unless documented otherwise (such as in the case of _read, _write, _transform methods for the streams inheritance API).

As far as I am concerned, technically implementation detail is free to change without breaking semver!

bodokaiser commented 9 years ago

While implementing a WebSocket parser using the stream.Transform I always wished to have some feature where you could reliable push data back to read it the next time. This comes in handy when you need four bytes in row to extract data and you don't need to manage the buffer yourself. I think #unshift was once intended to do this but it never worked for me.

Beside I would like to have streams working much more as expected.

sonewman commented 9 years ago

@bodokaiser This is an interesting idea... so you are describing a way of putting some data back onto the write buffer and applying back pressure. Unshift would not act in that way, because it is on the Readable side so it would only put it on the start of the readableState buffer.

This is definitely food for though!

bodokaiser commented 9 years ago

@sonewman yes, indeed. I always wondered why this method was missing as streams should be chunk-able

sonewman commented 9 years ago

It think this would be tricky to implement with what we have currently, for the purposes of this explanation I will call this method this.pushBack (this being the stream.Writable of course).

We currently create a new WriteReq when adding chunks to the _writableState buffer and this contains the encoding and the writecb supplied to .write(data, encoding, function () { /* this callback */ }) the question is, if a we were to call this.pushBack(chunk) would the callback fire? because technically some of the data would have been handled.

This could lead to confusion (and inconsistent behaviour) if it was decided that callback should be called multiple times, or alternatively if it was only called on the first handle and was not for the subsequent chunk.

The other problem is that when chunk is handled, it transfers the encoding and callback on to the _writableState before the _write method is called. When we call next in _write or _transform it eventually calls state.onwrite, which in turn calls the callback associated with that chunk either on thenextTick or immediately (if the writable end is being uncorked and the buffer cleared or it's being written to and there is anything previously backed up in the buffer).

(stream.Transform calls this via callback inception, in case following the source causes mind bending! :smile:)

The above scenario would mean that this.pushBack() would need to be called synchronously within the _write or _writev method. Otherwise the encoding and callback could be lost when the handling of the next chunk happens synchronously or before the point in the nextTick that the callback was added (which would likely occure if we used process.nextTick in _write or _writev and then call next) this could cause some very strange behaviour.

I think we would need to decide if this functionality was definitely something that we really want to add to stream.Writable as it would most likely depend on some larger internal restructuring. In addition we would need to watch closely to ensure that engineering this does not affect the streams performance.

natevw commented 9 years ago

The API does not feel comprehensive enough. What I mean is:

All in all I'm happy to leave the nitty-gritty details of basic stream.Readable/stream.Writable concerns to people with more skin in the game. (+ALL THE VOTES to more rigorous documentation of event timing and other "lifecycle" stuff though…) But the higher-level stuff built on top (thinking mostly stream.Transform and stream.Duplex and the missing stream.DuplexTransform), that does not seem to have been composed in quite a thorough enough way.

sonewman commented 9 years ago

I am curious, what problem does a DuplexTransform solve? Making a basic transform from a Duplex is as simple as calling .push in the ._write method.

I am quite interested in the ideas @chrisdickinson has been experimenting with in chrisdickinson/flows.

I am now convinced that this thing we know of as streams should probably be a much higher level abstraction to provide sugar on a more simple callback model. The EventEmitter pattern will always have a place, but it allows further complexity to breed within, particularly when an object is emitting events which it deals with internally.

With the right prototype construction a lot of the inner workings could be accessed and extended.

aredridel commented 9 years ago

I wonder if the right pattern is streams as a simple 'readable' callback, and a read method, and build from there.

sonewman commented 9 years ago

Yeah, I am in agreement. There is also obviously the interesting debate over abstractions for push and pull streams. There is a justification for both, like waiting for data events or calling read in which the _read method calls push on the readable.

The question is whether they should both have similar apis with callback signatures. In the future this could all be handled by using async and await.

Personally I find these abstractions more complex to reason about than the callback pattern. (But hey, people want synchronous looking code, because they have to think less about simplifying code abstractions.)

The other thing is that if you are waiting for callbacks, by implementing an interface where you can have multiple callbacks listening to the same flow of data, or piping to multiple destinations you end up implementing an event listener pattern.

The challenge will be looking outside the box for the answer, and the current streams2/3 implementation has a big eco system around it now, so it is going to be difficult to budge without a compelling enough reason and interoperability.

bjouhier commented 9 years ago

@bodokaiser Look at https://github.com/Sage/ez-streams#lookahead and https://github.com/Sage/ez-streams/blob/master/lib/helpers/binary.md All this is so easy to implement when you have a simple stream API.

bjouhier commented 9 years ago

@aredridel You don't need a readable event + a read method. All you need is a read(cb) method.

An event which is fired only once, and always in a context where it is expected, is an anti-pattern. It should not be an event, it should be a simple continuation callback API. There are several examples of this anti-pattern in node APIs: read/readable, write/drain, connect/connect. There may be an event at the low level but it should be encapsulated and exposed as a continuation callback function to the rest of the code.

The evented API forces you to have 1 function + 1 event for the success case + an error event for the failure case. Moreover the error event comes de-correlated from the call that provoked it so error handling is hard.

With a continuation callback API you have the guarantee that 1) the callback will eventually get called, 2) it will be called exactly once and 3) the outcome (result or error) can be easily correlated to the operation that generated it.

The evented API is complex and brittle. The callback API is simple and robust. KISS!

sonewman commented 9 years ago

@bjouhier i'm not sure what you meant:

An event which is fired only once, and always in a context where it is expected, is an anti-pattern.

Is contradicted with:

With a continuation callback API you have the guarantee that 1) the callback will eventually get called, 2) it will be called exactly once and 3) the outcome (result or error) can be easily correlated to the operation that generated it.

With the Readable API there is no way to pass a callback, which handles an error on that read, but a Writable stream does when calling .write()

I like the idea of a read(cb) API is definitely a good idea, and I have been thinking similar myself. It would work as an abstraction for both a push and pull stream. How ever it would be hard to determine whether it is a pushing or pulling model?

Essentially that is what we are doing in the case of a readable event, we are saying that the stream can be read().

In the case of a pushing stream you would likely want to add one handle and do something with the result, just like our current data or readable events.

In a pull stream, you would need to provide a similar event to trigger the read, or you would need to continually call read e.g.(crude example):

var bufferedData

;(function pullData() {
  process.nextTick(() => {
    readable.read((err, data) => {
      if (err) {
        // handle error
        return
      }

      bufferedData += data
      if (!this.ended) pullData()
    })
  })
}())

I do wonder though whether it would be possible to simplify the callback, particularly on a Writable because if you were to allow the callback to return an error you would have a lot of arguments to deal with (err, data, encoding, next).

I think this debate is good as there is a lot of idea bouncing here. I like the approach with ez-streams however from an outsider it don't really look that simple to use, or rather perhaps has a steeper learning curve to use effectively, although I can agree that the model of use is powerful.

The approach is quite similar to other libraries dealing with Immutable data structures. It seems to create a lot of objects in the process of handling data though, I would be interested to see a benchmark comparison with streams from core.

creationix commented 9 years ago

@sonewman I recently discovered a pattern for writing parsers that's really simple and elegant. It works wonderfully in Lua where there are multiple return values, but it should work in JavaScript too.

The basic idea is to have a function that gets called for each chunk to parse. If there isn't enough data to parse yet (you read the 4 byte length header, but don't have enough for the body that goes along with it), you simply return null. The next time data comes in, your function will be called with the original data plus the extra already buffered and concatenated. If you do have enough data, you return the decoded value along with the extra unused buffer (if any).

Since JavaScript can't return multiple values, you'd have to return an array or find some other way to both output the decoded value and the extra data. One idea is having this.buffer be the input data and simply replacing it with the new shorter data buffer. You will keep getting called over and over till you signify no more data is wanted.

This decoder style can be used with any stream interface. I have something like ez-streams in my lua code and the bridge between them is only about 10 lines of code.

What I really like about the this decoder style is it's really fast. The parser is often completely stateless and doesn't need any closures. It's just a function that takes in a chunk and returns a value and leftover chunk (or nothing if not enough data was input). Unit testing these decoders is trivial.

aredridel commented 9 years ago

Oh that sounds delightful, @creationix!

algesten commented 9 years ago

@creationix

Since JavaScript can't return multiple values

This is just one of the many reasons we use coffeescript ;)

sonewman commented 9 years ago

@creationix That sounds really awesome, and very simple. I am thinking the way of piping out into multiple streams from a stream could be just to create a new "stream" with the soul purpose of multiplexing if that was needed. That would make the parts of a pipeline much more simple and granular, since we could only plug in what we need.

I guess my reservations with ez-streams is that it seems to do a lot of things, but what you described is like the primitive form, on which almost any abstraction could be built.

I would be very interested to see an implementation of this, do you have an example? (Although my brain seems to be exploding with ideas on how this could be done!)

It works a lot like what we have to do when parsing some form of encoded data in a stream anyway, since what is being parsed has the possibility of crossing over the boundaries of the given chunk.

The idea that the callback would continually get called unless signified otherwise sounds a lot like calling cork() on a streams3 Writable with a _writev method instead of _write, however just providing the bare minimum to achieve this would likely reduce most of the complexity.

The main documentation being that the callback would receive an error or an array of buffers/data or null (following the node callback pattern of course).

I am just wondering how the readable side would work, because we could essentially still need two buffers in order to handle back pressure from a stream that we might be piping to.

There are other things, such as ending a stream, and the potential to drain to send final bytes on an ending transform, which would need consideration. But I think this could definitely be the way to go! Whether io.js core would want to adopt this is another thing. But benchmarking a comparison as a proof of concept would be worth the time. It would also be trivial to use on in the browser too, since it would likely require no dependencies.

This has definitely been a nice bite of food for thought though! Thanks! :smiley:

bjouhier commented 9 years ago

@sonewman

An event which is fired only once, and always in a context where it is expected, is an anti-pattern.

Is contradicted with:

With a continuation callback API you have the guarantee that 1) the callback will eventually get called, 2) it will be called exactly once and 3) the outcome (result or error) can be easily correlated to the operation that generated it.

I probably wasn't very clear. Let me rephrase:

If you have a function that behaves as follows:

Then I think that this function should have a callback API, not an evented API.

Regarding parsing, I do it with a transform. There are also a number of transforms in the lib/transforms directory (some trivial ones like lines and CSV but also some non-trivial like streaming JSON and XML parsers).

I got a bit lost in your discussion about push/pull streams and events on the writable side. In my design there are no events at all in the stream API and no need to distinguish between pull and push streams (*). It's all done with 3 calls: read(cb) and abort(err) (**) on the readable side, and write(data, cb) on the writable side. That's the fundamental API. All the rest is sugar. Most of the sugar API is completely straightforward to implement once you have the basic API.

(*) There is a nevertheless a uturn device that turns a writable in to a readable and which is very handy to implement higher level APIs (transform or dup for example).

(**)abort is missing but I'm going to introduce it.

sonewman commented 9 years ago

Merry christmas to all! :tada:

@bjouhier I guess i have spent too long in the depths of the streams code, trying to understand what it does, lol. It probably would sounds very confusing, I know I was confused for a long time about exactly how it works.

With regards to push or pull styled streams ideally both approaches should provide a similar styled API like your read() method although the fundamental paradigms are different, since a push stream needs to apply back pressure by indicating to the source "don't give me any more data" and the pull stream approach needs to decide when to stop asking for more data.

With regards to Writable events, i was just comparing it to the current functionality in a streams3 stream. I the features of _writev are not well documented because up until now they are only present in v0.11, if 0.12 of Node had been released then this would likely be part of the API documentation. In streams3 you can call cork() on a Writable stream and it will buffer up all the stuff written to it, then when you call uncork() you either get all the underlying _write calls at once, or if you instead define a _writev method it is called once with the writable buffer array so you can handle it all at once. This is similar to what @creationix was talking about with an array data.

This type of functionality would be particularly useful in a standard website approach where you just send a blob of html mark up or json, rather than doing multiple system write calls to a socket, it is more performant for a regular use case as discussed by @trevnorris in this talk http://youtu.be/jsiqvXi3qSA

I do find your transform approach interesting too, by providing the reader and writer to the callback. This suggests a transform would consist of 3 objects the reader, the writer and the transform itself. Is this correct or am I missing something?

ken-okabe commented 9 years ago

This is my mulit-post, https://github.com/iojs/io.js/issues/93#issuecomment-68115797

so excuse me but I think it's better to post here after I found this thread

In my humble opinion, stream of Node/io.js has fundamental problems, and I have never liked stream, and the fundamental problem of stream is so obvious to me.

I understand that all streams are instances of EventEmitter.

As you know this implementation is very hard to make everything work well. This is simply because stream is data on time-line, and so far Node guys have not solve this time-line problem well.

In order to solve this time-line problem fundamentally, you must employ Functional Reactive Programming (FRP) paradigm.

What is (functional) reactive programming?

I do resonate with Laurence G's simple description that FRP is about "datatypes that represent a value 'over time' ". Conventional imperative programming captures these dynamic values only indirectly, through state and mutations. The complete history (past, present, future) has no first class representation. Moreover, only discretely evolving values can be (indirectly) captured, since the imperative paradigm is temporally discrete. In contrast, FRP captures these evolving values directly and has no difficulty with continuously evolving values.

FRP is also unusual in that it is concurrent without running afoul of the theoretical & pragmatic rats' nest that plagues imperative concurrency. Semantically, FRP's concurrency is fine-grained, determinate, and continuous. (I'm talking about meaning, not implementation. An implementation may or may not involve concurrency or parallelism.) Semantic determinacy is very important for reasoning, both rigorous and informal. While concurrency adds enormous complexity to imperative programming (due to nondeterministic interleaving), it is effortless in FRP.

So, what is FRP? You could have invented it yourself. Start with these ideas:

  • Dynamic/evolving values (i.e., values "over time") are first class values in themselves. You can define them and combine them, pass them into & out of functions. I called these things "behaviors".
  • Behaviors are built up out of a few primitives, like constant (static) behaviors and time (like a clock), and then with sequential and parallel combination. n behaviors are combined by applying an n-ary function (on static values), "point-wise", i.e., continuously over time.
  • To account for discrete phenomena, have another type (family) of "events", each of which has a stream (finite or infinite) of occurrences. Each occurrence has an associated time and value.

and so on. Read on the link above.

Do not push or pull data, instead, leave them alone, and make the entire Evented IO data as immutable stream data on time-line and make the stream data to be used by functions.

Node/io.js is for JavaScript, and JavaScript is a functional language. So why not make io.js stream as FRP stream? Currently, not.


piping multiple streams into a single stream?

Easy. FRP stream can be operated in any way by functional programming.


Node/io.js is a back-end technology, and currently, the fundamental data structure, which is stream, is not FRP base.

On the other hand, as a front-end technology, currently, facebook commits FRP base projects, extremely actively. GitHub decided to employ facebook-react.

React A declarative, efficient, and flexible JavaScript library for building user interfaces. http://facebook.github.io/react/ https://github.com/facebook/react

IMMUTABLE Immutable persistent data collections for Javascript which increase efficiency and simplicity. http://facebook.github.io/immutable-js/ https://github.com/facebook/immutable-js

As I said, Node/io.js stream is not declarative or immutable, and that is the fundamental problem.

Also, I already have some conceptual FRP code by myself. It's very powerful with a very simple idea. If you are interested in, let me know.

Regards and Merry christmas.

bjouhier commented 9 years ago

@sonewman First a quick answer to your transform question: reader and writer are objects but the transform itself is just a function. Typical code:

function myTransform(cb, reader, writer) {
  // implement your transform here: 
  // read with reader.read
  // write transformed stuff with writer.write.
  // call cb() when you are done with the stream
}

// use transform in chain
source.transform(myTransform).filter(myFilter).map(myMapper).pipe(cb, destination);

There is a big design difference between an ez-streams chain and a classical node chain, and it has to do with push, pull, backpressure and buffering. Let me try to explain it on an example. Let us start with a typical node chain:

source.pipe(unzipper)
    .pipe(parser)
    .pipe(mapper)
    .pipe(formatter)
    .pipe(zipper)
    .pipe(destination)

In this chain, all the elements (source, unzipper, parser, mapper, formatter, zipper and destination) are evented streams. They all deal with stream events and they all have to handle backpressure.

The corresponding ez-stream chain is:

source.transform(unzipper)
    .transform(parser)
    .map(mapper)
    .transform(formatter)
    .transform(zipper)
    .pipe(cb, destination);

In this chain, only source and destination deal with events and backpressure. The other elements are just callback-based functions:

function unzipper(cb, reader, writer) { ... }
function parser(cb, reader, writer) { ... }
function mapper(cb, elt) { ... } // simpler signature for map
function formatter(cb, reader, writer) { ... }
function zipper(cb, reader, writer) { ... }

If source is dealing with a device that naturally pushes data (a TCP/IP socket for example), source is responsible for handling the little pause/resume dance. This means that source will do a bit of buffering (usually, you'll make that configurable). But all this is encapsulated in the source. The rest of the chain interacts with the source through a simple read(cb) method.

Similarly destination may deal with a slow device. In this case it will be responsible for handling the little write/drain dance. But the rest of the chain interacts with destination through a simple write(data, cb) method.

The big difference is that you don't have to deal with events and backpressure everywhere in your processing chain, you only need to deal with them in the end nodes (and you may not even need to deal with them on the right side of the chain because the last operation will not always be a pipe, it could be a forEach that executes operations, or a reduce that computes a result).

This is a big simplification for the chain itself. For example here is a simple mapper:

// use as <some chain>.map(upcase).<more chain>
function upcase(cb, data) { cb(null, data.toUpperCase(); }

And the map method itself is completely trivial.

Compare this with writing a node duplex stream that upcases strings.

I don't care if people take ez-streams literally or not (there may be a bit too much sugar for some people's taste). What I care about is the concept. Streams are complex today because events and backpressure are everywhere. If you deal with them in special devices that only surface a simple callback API, you simplify things dramatically.

sonewman commented 9 years ago

Hi @kenokabe, I can certainly see the position, which you are coming from. However fundamentally the approach of functionally reactive programming is suggestive of push evented streams, since we are required to signify an update to some data.

I do believe these data structures are great at handling 'the evolution of data over time' or 'the lifecycle of a data set'. Admittedly an internal array or 'buffer' of data (e.g. _*stream*State.buffer) could be classified as data set, however I think it would be over engineering for this usecase.

Coincidentally I am currently working on reactjs application (for one of the largest global grocery retailers based in the UK), and we are using Immutable.js for all of our application state data structures.

I have been sat scratching my head for a few weeks trying to draw the gap between the awesomeness of Immutable data structures and cursors with what we know of as streams in io.js/node.

I disagree with the premise that Immutable data structures are a silver bullet for all kinds of data structures and I am not sure it is appropriate for this kind of low-level functionality. I am however happy to be proved wrong :smiley: !

IMO Immutable data structures work amazingly well for state, when a flow of data is updating the current state, it is awesome to get granular levels of detail into the changes in nested data structures and the performance of diff'ing provided by using these patterns.

However I think the stream abstraction in io.js is a fundamentally different problem to solve.

To me the essential point to a low level stream is that it should contain as little state as possible. This is the key to making it as memory efficient as possible, since it is intended as a low level abstraction, in the most part for handling binary data.

In the best case scenario and there is no back pressure a stream literally just marshals data.

I however I am not saying that an API of such should / could not be inspired by that of immutable libraries.

@bjouhier I think I understand much better the problem that ez-stream is solving after that explanation. It's funny I had fleeting thoughts perhaps a year ago about the idea of batch operations on a chunk, then I completely forgot about it...

So it is great to see that you have run with that idea, I can certainly see the power of this approach, it would definitely provide powerful functionality with a lazy operational model. The API is very FRP inspired and I do like that.

As @creationix said this type of abstraction would be simple to apply on top of a simple base stream implementation and is a must have in the toolbox for serious streaming!

(With out delving through the code, I assume that the chained operations all happen in deferred procces.nextTick()s in order to prevent blocking.)

The I disagree with is the point about creating a toUppercase Duplex (or more appropriately a Transform) stream as it is pretty simple, and even less boilerplate code is needed if you use through2 by @rvagg, but that is by the by.

bjouhier commented 9 years ago

@sonewman

(With out delving through the code, I assume that the chained operations all happen in deferred procces.nextTick()s in order to prevent blocking.)

Very simple operations like map execute in current tick but you can defer them by calling nextTick inside your mapper. More complex operations (all transforms - and even simple things like filter that are implemented with transform) are all deferred.

Regarding toUppercase, you are right that through2 saves you the pain of implementing a Duplex but my point was that all the functional sugar (map but also filter, every, some, reduce, concat, tee, dup, etc.) becomes trivial to implement.

Another important point is error handling: if an error occurs anywhere in the chain, it will always end up in the continuation callback at the end of the chain. That's why all reducers (pipe, forEach, reduce, every, ...) take a continuation callback. This callback is where you will handle errors, but also where you will put code that needs to execute after the chain has been fully processed.

Contrast this with node chains where you have to attach an error listener to every pipe if you want to intercept errors that may happen anywhere in the chain.

ken-okabe commented 9 years ago

Hi @sonewman Thanks for your thought.

Coincidentally I am currently working on reactjs application (for one of the largest global grocery retailers based in the UK), and we are using Immutable.js for all of our application state data structures. I have been sat scratching my head for a few weeks trying to draw the gap between the awesomeness of Immutable data structures and cursors with what we know of as streams in io.js/node.

This sounds great, at least, to me :)

I forgot to mention @dominictarr 's library in my previous post. https://github.com/dominictarr/event-stream EventStream is like functional programming meets IO

He also points out an approarch to FRP for nodeIO.

The EventStream functions resemble the array functions, because Streams are like Arrays, but laid out in time, rather than in memory.

However, obviously his implementation is on top of the layer of the current stream, so I am sure this implementation is not the solution for our fundamental issue here.


So, here's my thought.

First of all, I have never committed to the low level EventEmitter -> stream implementation, so I might be terribly wrong, and if you find so, please tell me and excuse me. Lately, I use this FRP idea to my own JavaScript project, but not sure my concept approach can apply to such a low-level data management layer, so that is my concern here. Anyway nothing to lose, let me give a shot.

I disagree with the premise that Immutable data structures are a silver bullet for all kinds of data structures

IMO, Immutable data is the only silver bullet since any other mutable data is not logic and only immutable data is logic. Let's look at increment.

i++ is mutable. This is i = i + 1, so not a logic.

In most of the case, especially for IO, we do like i++ in mutable manner. Node stream do that too, and it is not logic, and that is the fundamental problem.

To me the essential point to a low level stream is that it should contain as little state as possible. This is the key to making it as memory efficient as possible, since it is intended as a low level abstraction, in the most part for handling binary data. In the best case scenario and there is no back pressure a stream literally just marshals data.

I agree. 100%.

Stream data in FRP context is immutable and stateless. Stream is just group of data aligned on time-line, past, present and future, infinite data like natural numbers or Fibonacci sequence. Streams are like Arrays, but laid out in time, rather than in memory.

In functional programming, we can deal such an infinite sequence using lazy evaluation(call-by-need). We won't calculate whole infinite sequence, we cannot, and we don't have to. You know well since you kindly mentioned that you are using Immutable.js for all of your application. FRP is the same. Obviously, we cannot calculate future stream data aligned on time-line. We don't have to.

In fact, there are some immutable and stateless object to deal data aligned on time-line in real world.

DVD(Blu-ray Disc).

Obviously, DVD is an immutable and stateless object, unless you overwrite the recorded data.

You may think, "Oh, it's just recorded data, here, we must deal real-time event of IO."

Hey, we also have such a thing in real world.

WebCam or Digital Video Camera.

A webcam is a video camera that feeds or streams its image in real time to or through a computer to computer network. When "captured" by the computer, the video stream may be saved, viewed or sent on to other networks via systems such as the internet, and email as an attachment.

DVD/WebCam model is a true FRP model. The entire event is recorded and once recorded, the data is immutable and stateless. Every frame of the movie/clip has a time-stamp. You can access any frame with a time-stamp argument. It's functional and transparent.

Twitter time-line is also a FRP model. Twitter asks "What's happening?" You post what you are doing right now. <-- IO event happens!! And the whole now is recorded to the time-line with the time-stamp. If you want to access other guy's now, just click on the username. You can access his now from his time-line. To be precise, it's not technically his now but the most recent; however, it is his now in the Twitter context. Very functional and transparent. Twitter time-line is immutable and stateless.

Of course, the data of WebCam or Twitter time-line is piled on top of the existing data, and I would call this phenomenon, as the future data aligned on time-line appears to the present. It's still immutable and stateless as same as infinite natural numbers or Fibonacci sequence. It's simply a matter of lazy evaluation in time, rather than in memory.

Here, Twitter time-line event is discrete. It's not happening in every time-stamp in a certain resolution of time-line. However, you can still access the most recent event prior to any time-stamp, functionally.

So, that is my own FRP concept, or DVD/WebCam/Twitter-timeline model.

I employ this concept to my own conceptual FRP library in JavaScript.

https://github.com/kenokabe/spacetimeline

npm install spacetimeline

Also, some facebook/react+spacetimeline live-demo web-page.

http://kentutorialbook.github.io/demo/frp-redball-delay/index.html https://github.com/kentutorialbook/kentutorialbook.github.io/tree/master/demo/frp-redball-delay

In this live-demo, my FRP library records all mouse-move event with a time-stamp. Or all mouse-move event in 10 seconds duration. You can limit the time-line data size.

You can access any stream data aligned on time-line, or to be precise, the most recent event prior to any time-stamp, functionally.

In the live-demo, the code access the stream data on 1 seconds prior time-stamp to the current time-stamp.

var cursor = ___cursor.value(___('NOW').subtract(1, 'seconds'))

_Please note __cursor is the stream data appearing while time-line proceeding to the future.

___cursor.appear(cursor);

Then SVG virtual DOM element is passed to react function.

As a result, you can re-play 1 seconds past world that you behaved.


(function() {

  var timelineCapacity = moment.duration(10, 'seconds');
  var ___cursor = ___(timelineCapacity);

  var onMouseMove = function(e) {
    var cursor = {
      x: e.clientX,
      y: e.clientY
    };
    ___cursor.appear(cursor); //the stream data appearing while time-line proceeding to the future
  };

 //----------------------------------------------------------------------------------
 //this logic is extra/option, but with this, the demo looks cooler, 
 //try to comment out, and see how it goes

  var interval = setInterval(function() {
     ___cursor.appear(___cursor.value(___('NOW')));
  }, 10);

 //-----------------------------------------------------------------------------------

  ___cursor.compute(function() {  
   // here is the final part where pure logic meets our physical world
   // in lazy evaluation context, this corresponds to  `toArray()`

    //var cursor = ___cursor.value(___('NOW'));
    //var cursor = ___cursor.value(moment().subtract(3, 'seconds'));
    var cursor = ___cursor.value(___('NOW').subtract(1, 'seconds'))

    var dom = <svg height = "600" width = "600">
              <circle cx = {  cursor.x  }  cy = {  cursor.y  }  r = "20"   fill = "red" />
              </svg>;

    React.render(dom, document.body);

  });

  document.addEventListener("mousemove", onMouseMove);

  //====================================
})();

That's my thought.

Regards.

sonewman commented 9 years ago

I think I understand the point you are making and I think it all comes down to the granularity at, which the data is being segmented and it's consistency to that granularity.

For instance each buffer / data chunk in the streams state array "buffer" could be classified as immutable (unless it is mutated directly - e.g. a byte is mutated in a Buffer or a property is mutated on a stream with objectMode), or the sum of all the data from a source could be conceived as immutable. A source could send a prescriptive sized chunk always, or it could be completely unpredictable.

_The stream state buffer (_readableState.buffer or _writableState.buffer) could never be immutable because its contents are forever changing, unless data is never appended to it._

Is the above examples referring to DVD/Webcam data, relating to the sum of the contents as immutable? This calculation of state could be a few GB of data.

If we want to stream this data over a network connection (even before we have this final sum), we would all agree it would be hugely inefficient for us to send the whole amount of data indefinitely (obviously in the case of many small sized files this would not be a problem, and would be more performant to deal with in a single bound).

By breaking this data in to smaller chunks we limit / control the resources needed to deal with that total (virtual) mass of that data over time.

In your examples with regards to tweets and mouse events; each chunk is always a fixed amount of data representing one chunk and the data is always going to be of the same structure(an object containing the same keys, just with different values).

Because of the nature of this example we want to deal with each individual chunk, as a separate immutable source of truth.

In the case that we may want to send this data over the wire, this would however not be the most efficient way send it, as smaller chunks would require lots of smaller writes, rather than buffered bigger ones. In that case we may want to combine them all together encoded in some messaging protocol, to be decoded at our destination and broken out into measurable immutable data chunks again. This intermediate would be an example where this data, though immutable has changed consistency and needs to be dealt with in a completely different paradigm, though still an array of data over time.

The examples given of the tweets and mouse events are typical / common use cases of the type of streams needed and implemented by application / program developers, to handle (domain) data over time.

This is certainly how a lot of people like to use streams. The current implementations of streams (streams1, 2 & 3) all facilitate this functionality (the latter two obviously require objectMode construction flags), and many other modules have been created to aid in this type of pipeline (such as event-stream, etc...).

However if the fundamentals of the stream implementation were to be fixed to this type of use-case it could be restrictive, particularly when dealing with encoded data (typically implementing (or transforming between) some kind of text or binary protocol).

In this scenario the length of a buffer is not always predictable, it does not necessarily contain the same information structure as the last.

IMO for this reason a primitive approach would work better, because it allows for other paradigms to be constructed on top of it, rather than the other way round.

Again IMO streams need a flexible unopinionated low-level abstraction to allow other things to be cultivated on top of them without obstruction. Some users may not want to be forced into using immutable patterns in their code, since in the mutable world of JavaScript anything is possible!

To be clear, I am not saying that the underlying mechanisms for passing through data would not benefit from functional programming and I am not saying Immutable data structures handled by / in streams is not a great way of dealing with data over time.

A stream unfortunately due to its need for versatility, is always likely to be a state machine by design.

The goal is obviously to make streams as stateless, simple and as flexible as possible to allow for diverse usage; while providing a high performance for 'cores' use-cases and a conventional interface, to allow pipelines of streams to be constructed and to be backwards compatible to cater for the currently flourishing eco system developing on top of them.

ken-okabe commented 9 years ago

A Happy NewYear to all!

@sonewman

Thanks a lot, very informative. I think I should consider this issue harder by myself. So far since I have not commit this problem, your immutable/FRP consideration against my idea is truly valuable to me.

I will consider this more from now, but so far, if there's a silver bullet, that would be FRP, if there's not such a thing, all we can do is to setup a sweat spot.

Regards.

ken-okabe commented 9 years ago

About Immutable data for chunk.

How about this?

Add time-stamp to every chunk of data. Then the stream data becomes immutable.

stream.read(now) or something like that. I understand the current node-stream employ pull model. Pull data on time-stamp basis. Then again, it's functional, and reference transparent.

The stream state buffer (_readableState.buffer or _writableState.buffer) could never be immutable because its contents are forever changing, unless data is never appended to it.

Since every chunk of data is immutable with the time-stamp, the stream state buffer also becomes immutable with the time-stamps. It's only matter of which part of immutable data you contains in the buffer.

sonewman commented 9 years ago

@kenokabe Happy New Year to you too.

I think this suggestion would be good for a certain use case. The problem is that .read(n) already has a valuable purpose. This dictates the number of bytes to be read from the underlying buffer. This involves some quite complex semantics by constructing the relevant "chunk" (of size n) from the internal array of Buffers. This chunk is always constructed from the front of the array of Buffers.

By allowing data to be plucked out of the array of Buffers at random points it would leave strange chunks of data in the array. This would inevitably require some kind clean up or flushing.

This is not to say that you cannot override the .read() method in an inherited implementation. But getting a relevant chunk based on a timestamp would not be useful in the use case of core.

By using a timestamp, components interacting with the stream would need to know the specific timestamp of the chunk to access it (i don't know how this would work).

The another issue with this is that the timestamp of the chunk would only be relevant to the stream, which added it, and the Buffer instance the timestamp was added to.

Would this change when moving the buffer bytes around?

Although bytes in a buffer always refer to the same data in memory despite what Buffer instance they exist in, because we would be adding the timestamp to the instance itself not the byte of data these would easily be lost when buffers are .slice()d.

This is the problem with they low-level data, which core streams deal with, in comparison, to the high-level (more naïve) object streams (often found in user-land), since they usually have a consistent chunk (an object) of data.

ken-okabe commented 9 years ago

@sonewman

Thanks for your comment.

One more issue, I think strongly realted to this topic is Callbacks are imperative, promises are functional: Node’s biggest missed opportunity

With this definition in place, I want to address what I consider to be the biggest design mistake committed by Node.js: the decision, made quite early in its life, to prefer callback-based APIs to promise-based ones.

Writing correct concurrent programs basically comes down achieving as much concurrent work as you can while making sure operations still happen in the correct order. Although JavaScript is single threaded, we still get race conditions due to asynchrony: any action that involves I/O can yield CPU time to other actions while it waits for callbacks. Multiple concurrent actions can access the same in-memory data, or carry out overlapping sequences of commands against a database or the DOM. As I hope to show in this article, promises provide a way to describe problems using interdependencies between values, like in Excel, so that your tools can correctly optimize the solution for you, instead of you having to figure out control flow for yourself. I hope to dismiss the misunderstanding that promises are about having cleaner syntax for callback-based async work. They are about modeling your problem in a fundamentally different way; they go deeper than syntax and actually change the way you solve problems at a semantic level.

I feel sorry about not integrating these problems for time-stamp idea and promise-based implementation yet in my brain so far, but I think it's better to share this idea block.

Regards.

joepie91 commented 9 years ago

Another point that I'd completely forgotten about: there should probably be some way to have a consistent (optional) seek implementation. For things like streaming out files from an archive it's essential to have seek functionality, but this is currently not supported in streams as far as I can tell.

Sure, you could have some module-specific extension to a stream to support seeking, but that would defeat the point of having a standard and interoperable stream interface.

sonewman commented 9 years ago

I'm not really sure I understand what you mean by 'seeking'. Do you mean seeking for a file or seeking through a file.

Implementation of both is perfectly possible with the current streams implementation.

You would just need to create a stream, which does the desired seeking. If it doesn't exist already npmjs.com/search?q=stream+seek.

There is a lot of risk of feature / scope creep for me to think this would be a good idea to be in core. (Can it do this too..? and this...? How about this other thing..?)

If you are seeking the presences of a sequence of text or bytes, over a chunk barriers there are options for this.

I have written a module called byte-matcher, which facilitates buffer pattern matching using a naïve matching technique. I have also experimented using the Boyer-Moore-Horspool in JavaScript but have yet to test on a significant amount of data for this algorithm to provide a quicker bench result. (Maybe this could give you some inspiration)

In short there are options out there that are relatively simple to learn and implement.

There is also an open issue(https://github.com/iojs/io.js/issues/95) and PR(https://github.com/iojs/io.js/pull/160) for providing an indexOf method to the Buffer.

Obviously implementing parsing over chunk boundaries still requires considerations.

joepie91 commented 9 years ago

@sonewman I'm refering to seeking in the sense of reading particular sections of a file using an existing descriptor (or, in this case, stream) - not just for filesystem files, but for any kind of stream that supports this or could reasonably abstract it. Again, I am aware that it is possible to implement this, but it is not part of a standardized API (like the read event or push method are, for example) so it's not possible to have a standardized implementation right now.

For the specific example of a filesystem stream, as far as I understand it all current implementations do in-memory buffering of the entire stream (therefore defeating the point), and the only non-buffering approach I am aware of is to reuse a file descriptor when creating a new stream for every seek (and using the offset functionality), but I don't think this is a technically supported usecase and I'd expect it to break at some point.

An example usecase I was trying to work on recently was a vinyl stream from an archive; it would be an object stream, one object for each file in the archive, the 'contents' for each of those files being a stream itself. To accomplish this without having to open an fd for every file in the archive individually I'd have to somehow be able to have an offset linked to each of the 'substreams', refering to a specific section in the archive file, all using the same fd. That doesn't appear to be 'officially' possible with the current streams API.

nfroidure commented 9 years ago

@joepie91 it looks like fs.createReadStream takes a "fd" option in http://nodejs.org/api/fs.html#fs_fs_createreadstream_path_options

Wouldn't it work for the case you mention ?

sonewman commented 9 years ago

@nfroidure :+1:

There is also creationix/seekable by @creationix, which allows you to seek through a stream in a similar way.

sonewman commented 9 years ago

Also:

An example usecase I was trying to work on recently was a vinyl stream from an archive; it would be an object stream, one object for each file in the archive, the 'contents' for each of those files being a stream itself. To accomplish this without having to open an fd for every file in the archive individually I'd have to somehow be able to have an offset linked to each of the 'substreams', refering to a specific section in the archive file, all using the same fd. That doesn't appear to be 'officially' possible with the current streams API.

Is a very unique use-case. I don't think the stream itself needs additional API to deal with this. There is absolutely nothing to stop you adding additional methods to your stream implementation to facilitate this functionality.

Are you using streams 2 or 3?