nodejs / node

Node.js JavaScript runtime ✨🐢🚀✨
https://nodejs.org
Other
105.26k stars 28.51k forks source link

[stream] support piping multiple streams into a single stream #93

Closed missinglink closed 9 years ago

missinglink commented 9 years ago

In the following example sink should wait for all piped streams to unpipe() before calling _flush.

As it currently stands the following code outputs:

$ node test1.js 
a
done

If you remove the // applyFix(); comment you get:

$ node test1.js 
a
b
done
var through = require('through2');

var tap1 = through.obj();
var tap2 = through.obj();
var sink = through.obj(transform, flush);

var pipes = 0;
// applyFix();

tap1.pipe(sink);
tap2.pipe(sink);

tap1.write('a');
tap1.end();

setTimeout(function(){
  tap2.write('b');
  tap2.end();
}, 100);

function applyFix(){
  sink.on( 'pipe', function(){
    pipes++;
  });
  sink.end = function(){
    if( !--pipes ){
      sink._flush();
    }
  };
}

function flush(){
  console.log( 'done' );
}

function transform( item, e, next ){
  console.log( item );
  this.push(item);
  next();
}

ref: https://github.com/iojs/io.js/issues/89

chrisdickinson commented 9 years ago

Ah, interesting -- it seems like the pipe mechanism isn't designed to handle multiple sources piping into a single sink (it assumes it the source is the only stream writing into the destination). This should definitely be made explicit in the docs, at the very least.

I'm curious: what is your use case for piping multiple streams into a single stream?

Also, do you mind editing the title to "[stream] support piping multiple streams into a single stream"?

missinglink commented 9 years ago

For me this is a pretty common case when forking the output from a parser to separate pipelines (one per 'type' of record) and then saving to a database using a db client singelton sink.

phpnode commented 9 years ago

Another use case - piping data from multiple clients into one stream and then piping the stream back into each client, e.g. for chat rooms.

vkurchatkin commented 9 years ago

this only works for object mode streams, and only if the order of objects is not significant

benjamingr commented 9 years ago

+1 for making this explicit in the docs. Are we sure this change won't break code existing code though? Also what vkurchatkin said.

indutny commented 9 years ago

Summoning @caineio

sonewman commented 9 years ago

This is interesting because it is going to be much more prevelelant in a situation when you have a stream hanging around and potentially never being ended.

Streams are used best when the are created, handle a specific flow of data and the disposed of. But I can see a use case for persisted streams and multiple multiple sources of input for aggregation in addition to multiple outputs (which does already work) should this discussion be moved to https://github.com/isaacs/readable-stream?

The complexity to this issue is great however particular if one readable-stream is pushing data and the other is expected to be pulled via '.read()'

chrisdickinson commented 9 years ago

should this discussion be moved to https://github.com/isaacs/readable-stream?

Nope! Here is fine.

This is interesting because it is going to be much more prevelelant in a situation when you have a stream hanging around and potentially never being ended.

It's definitely worth looking into what happens to tap2 when sink has ended due to tap1 -- does it unpipe? Does it end? Does it remain open?

sonewman commented 9 years ago

hmm, interesting... because when a readable stream ends (via .push(null)) although it calls .end on the writable it might be piping to, the writable definitely does not emit anything back up the chain (by default) to anything else that might be piping to it. So if the second stream were to start pushing data through again, you would get a write after end error.

sonewman commented 9 years ago

Another interesting edge case is that we have the functionality to have a readable stream, which can be ended but will not call .end() on the writables it is piping too, but we can't do it the other way round.

For instance if we want to say 'this readable is staying open, it will continue to get data (and buffer it up) but everything, which is consuming that readable stream right now, must stop'. We can apply back pressure but that isn't really the same, we would have to manually know when that point was/is and unpipe individually.

greelgorke commented 9 years ago

imho the core streams should remain as basic as possible. for merging several streams into one there are already a few modules on npm, and merging might have different strategies, like unordered merge, merge with a function or merge in pipe-order etc.

sonewman commented 9 years ago

The main problem is that streams have be created for Node's particular use cases, and each of them have different constraints. E.g. net.Socket, process.sdin, http.IncomingMessage, etc. The ideology has been created to fit specific needs, based on the complexity of other internal components.

I think in general, @trevnorris idea of building low level apis, wrapping the c/c++ libs (libuv, v8, http-parser) means that we can start building abstractions on a simpler base now we know what iojs is supposed to do.

The previous development has always seemed very organic. But now we have lessons learnt from where we are now, better abstractions can be built on a simpler base and inevitably make the high level api's simpler as a result.

It is always tricky to re-write the underlying foundations of some software without compromising backwards compatibility. But i believe we are in a better place than ever now, with the most talented individuals to accomplish the task!

zensh commented 9 years ago

@missinglink https://github.com/teambition/merge2 merge multiple streams into a single stream in sequence or parallel.

I use it in gulp

ken-okabe commented 9 years ago

In my humble opinion, stream of Node/io.js has fundamental problems, and I have never liked stream, and the fundamental problem of stream is so obvious to me.

I understand that all streams are instances of EventEmitter.

As you know this implementation is very hard to make everything work well. This is simply because stream is data on time-line, and so far Node guys have not solve this time-line problem well.

In order to solve this time-line problem fundamentally, you must employ Functional Reactive Programming (FRP) paradigm.

What is (functional) reactive programming?

I do resonate with Laurence G's simple description that FRP is about "datatypes that represent a value 'over time' ". Conventional imperative programming captures these dynamic values only indirectly, through state and mutations. The complete history (past, present, future) has no first class representation. Moreover, only discretely evolving values can be (indirectly) captured, since the imperative paradigm is temporally discrete. In contrast, FRP captures these evolving values directly and has no difficulty with continuously evolving values.

FRP is also unusual in that it is concurrent without running afoul of the theoretical & pragmatic rats' nest that plagues imperative concurrency. Semantically, FRP's concurrency is fine-grained, determinate, and continuous. (I'm talking about meaning, not implementation. An implementation may or may not involve concurrency or parallelism.) Semantic determinacy is very important for reasoning, both rigorous and informal. While concurrency adds enormous complexity to imperative programming (due to nondeterministic interleaving), it is effortless in FRP.

So, what is FRP? You could have invented it yourself. Start with these ideas:

  • Dynamic/evolving values (i.e., values "over time") are first class values in themselves. You can define them and combine them, pass them into & out of functions. I called these things "behaviors".
  • Behaviors are built up out of a few primitives, like constant (static) behaviors and time (like a clock), and then with sequential and parallel combination. n behaviors are combined by applying an n-ary function (on static values), "point-wise", i.e., continuously over time.
  • To account for discrete phenomena, have another type (family) of "events", each of which has a stream (finite or infinite) of occurrences. Each occurrence has an associated time and value.

and so on. Read on the link above.

Do not push or pull data, instead, leave them alone, and make the entire Evented IO data as immutable stream data on time-line and make the stream data to be used by functions.

Node/io.js is for JavaScript, and JavaScript is a functional language. So why not make io.js stream as FRP stream? Currently, not.


piping multiple streams into a single stream?

Easy. FRP stream can be operated in any way by functional programming.


Node/io.js is a back-end technology, and currently, the fundamental data structure, which is stream, is not FRP base.

On the other hand, as a front-end technology, currently, facebook commits FRP base projects, extremely actively. GitHub decided to employ facebook-react.

React A declarative, efficient, and flexible JavaScript library for building user interfaces. http://facebook.github.io/react/ https://github.com/facebook/react

IMMUTABLE Immutable persistent data collections for Javascript which increase efficiency and simplicity. http://facebook.github.io/immutable-js/ https://github.com/facebook/immutable-js

As I said, Node/io.js stream is not declarative or immutable, and that is the fundamental problem.

Also, I already have some conceptual FRP code by myself. It's very powerful with a very simple idea. If you are interested in, let me know.

Regards.

Fishrock123 commented 9 years ago

io.js is Evented IO for V8 JavaScript

We need to get rid of this already, because it really isn't just/quite that anymore.

ken-okabe commented 9 years ago

Ok, if you say so, I removed that mention from my comments, and that still sustaines my opinion.

drkibitz commented 9 years ago

I would love to simply have support for (without npm dependencies) stream functions of map, filter, reduce, merge, and concat. The list goes on, but fundamentally, the api is already mapped out on Array.prototype. There are many ways to think of streams but the most useful and accurate is in the FRP excerpt from the comment by @kenokabe. Here's another if you are questioning this or another version of a stream api; stream operatations and array operations are interchangeable. I would not like an implementation of the following library itself, but I would still recommend taking cues from Rx (Reactive Extensions, specifically Rx.js).

bjouhier commented 9 years ago

@drkibitz This is exactly what I'm proposing with ez-streams. There is also potential convergence with lazy.js

There are too many issues related to streams, which makes discussions hard to follow. Maybe we should elect one of them and close the others. #89 "Streams pain points" sounds like a good title for the discussion.

ken-okabe commented 9 years ago

I forgot to mention @dominictarr 's library

https://github.com/dominictarr/event-stream EventStream is like functional programming meets IO

sonewman commented 9 years ago

As I mentioned in https://github.com/iojs/io.js/issues/89 FRP immutables are great

these data structures are great at handling 'the evolution of data over time' or 'the lifecycle of a data set'

I think a lot of inspiration can be drawn from both FRP and ez-streams. Event-stream is typically a toolbox of streams to provide certain typical array-like functionality to streams more trivial by providing additional sugar.

This discussion seems to have gone full circle.

We are complaining about the current state of streams saying they are "broken" and should be replace with FRP styled data structures. Only to quote libraries that have been around since the days of streams1, which already deal with the problem.

For developers building programs on top of core, these type of constructs are very useful.

Core's requirement for streams is fundamentally to deal with encoded data, received as multiple chunks over time from an underlying handle or data source. There is no necessity to introduce the added overhead of this type of syntactic sugar into core.

Yes we should have discussions about the core streaming API and conventions, and yes we should have discussions about pain points, in which the implementation causes frictions and obstructions in ways we as developers may want to use them in generic ways.

And yet a separate topic should be a discussion on the internal implementation, whether it could be made more simple, whether the core implementation needs to rely on an inherited EventEmitter, etc.

There are also considerations required for both of the above about maintaining backwards compatibility for the ecosystem, which has been built on top of these streams(which given the popularity must have done something right!).

But these are much different discussions to how we want to use streams with specifics to our programs. If we are just looking for a more convenient API that suits our programming needs, these can easily be fleshed out on npm, if they have not been already! I think this is really where that kind of innovation and discussion belongs.

ken-okabe commented 9 years ago

Thanks @sonewman

Due to my multi-post, I follow this discussion in #89 thread, and replied there. https://github.com/iojs/io.js/issues/89#issuecomment-68178322

Regarads

Fishrock123 commented 9 years ago

I think further discussion can move to https://github.com/iojs/readable-stream for actual implementation proposals / etc.