dominictarr / mux-demux

mutiplex-demultiplex multiple streams through a single text Stream
MIT License
179 stars 15 forks source link

proposal for new syntax (II) #22

Open juliangruber opened 11 years ago

juliangruber commented 11 years ago

Everytime I use mux-demux I wish for something more high-level, because I notice that I need to put brain power into it, although all it does is (de)multiplexing and that should be simple.

So, either make this a new module, or somehow make it backwards compatible, but I'm thinking of this:

It shouldn't matter whether the client or the server creates a stream, they should just be able to use it rather than create it.

var MuxDemux = require('mux-demux');
var mdm = MuxDemux();

mdm('chat').pipe(myChatStream).pipe(mdm('chat'));

mdm.pipe(net.connect(3000)).pipe(mdm);

So mdm(name) should return the stream associated with the name or create it. Every of those streams is paused and buffers when the connection is down or not yet established.

You also wouldn't associate so much meta data with a stream but rather make it fit all in the name. So if you have one stream per room in a chat, you'd have

mdm('room-<id>')

However, mdm.on('connection', fn) could still work.

This is somewhat like the api socket.io provides for multiplexing, saying io.off('chat'). I very much like how this would reduce our brain overhead as soon as it's implemented.

What do you think?

juliangruber commented 11 years ago

The example above written using the current api:

var MuxDemux = require('mux-demux');
var mdm = MuxDemux();

mdm.on('connection', function (stream) {
  if (stream.meta == 'chat') {
    stream.pipe(myChatStream).pipe(stream);
  }
});

// net

And that only on the server, on the client you'd need

var MuxDemux = require('mux-demux');
var mdm = MuxDemux();

myChatStream.pipe(mdm.createStream('chat')).pipe(myChatStream);

// net
juliangruber commented 11 years ago

actually this can very well be implemented on top of the current MuxDemux as new module

juliangruber commented 11 years ago

Ok, I see the problem with this room-based approach: Backpressure doesn't work. That's a bummer :(

Raynos commented 11 years ago

This depends. For server <-> client having the client call createStream and having the server listen to "connection" is a good API.

For peer <-> peer having a symmetrical API is nice. The problem with that is currently meta is just meta information, a header for the stream. It's not an identity for the stream. So you can call createStream("1") n times and it creates n streams and emits n connection events with "1" as the name.

Having a seperate API like createChannel(ID) might be useful for symmetrical p2p APIs where both ends open a connection, apply mdm on top of it and then open a channel. This would work under the assumption that both ends need to call createChannel(ID) and calling createChannel(ID) twice either throws an error or returns the first (memoized) stream.

juliangruber commented 11 years ago

yup, I imagine the stream id being unique and returning always the same stream

juliangruber commented 11 years ago

but this api will also work for server <-> client topologies. Instead of a connection event on the server a client can write his id to a channel called join when he connects.

// Client
var mdm = require('mux-demux')();
mdm('join').write('juliangruber');

// Server
var mdm = require('mux-demux')();
mdm('join').on('data', function (nick) {
  mdm('user-' + nick).pipe(/* ... */)
})
dominictarr commented 11 years ago

I like this idea.

meta is sent over a special createStream message, and each stream is assigned a new random id underneath.

How @juliangruber's suggestion would probably be implemented is by assigning meta to be the id of the stream, and not having an explicit createStream message. If meta information is needed, it can be sent as a header section is the first part of the stream.

Since this is a new approach, and mux-demux is stable, I think I'd rather see this new idea implemented as a new module (although perhaps it could be created by deleting parts of mux-demux)...

I don't see any reason why back pressure couldn't be made to work. mux-demux has back-pressure - it sends pause/resume messages.

One other thing I'd recommend is to use redis-protocol-stream to implement the multiplexing, rather than line separated json arrays, currently json streams are parsed/stringifyied twice. (I've been meaning to fix this, but havn't got around to it yet)

juliangruber commented 11 years ago

re back pressure:

Say I pipe a file to the file channel:

fs.createReadStream('file').pipe(mdm('file'))

Then, two listeners are there in the file channel

mdm('file').pipe(fs.createWriteStream('backup'))
mdm('file').pipe(analyzeDataStream)

How would backpressure work without one writable stream slowing down the other? That correlates to what we had in another issue where you said that you must only create streams in the connection listener, for every connection, and not before that.

dominictarr commented 11 years ago

ah right - this is a problem with any stream. You have to slow down the stream for one without slowing it for the other. You could buffer it for the slow one, but that would be worse than just slowing the stream.

The best approach, I think, is to make sure that secondary streams are "light weight" and will not slow the first stream.

juliangruber commented 11 years ago

So you say just apply backpressure to the source as soon as one of the destinations requests it?

dominictarr commented 11 years ago

Yeah, that is just how streams work.

Generally - it's just a simple A -> B -> C pipe line, not a forked one. If you want to make a case for handling quality of service in a forked pipeline you'd need to start from a compelling real-world use-case. I suspect, though, in most cases only one of the destinations would be the bottleneck so you could effectively ignore the 'light weight' destination.

I'm not sure what this is, I think most of the multi-dest cases is logging, or analytics, which should be pretty light-weight.

The best multistream case I can think of, is uploading a file, and saving it on multiple nodes, then you'd need to split the stream, and also handle the case where some of the destinations are down.