davedoesdev / bpmux

Node stream multiplexing with back-pressure on each stream
MIT License
18 stars 2 forks source link

bpmux   Build Status Coverage Status NPM version

Node stream multiplexing with back-pressure on each stream.

The API is described here.

Example

Multiplexing multiple streams over a single TCP stream:

var net = require('net'),
    crypto = require('crypto'),
    assert = require('assert'),
    BPMux = require('bpmux').BPMux,
    sent = [];

net.createServer(function (c)
{
    var received = [], ended = 0;

    new BPMux(c).on('handshake', function (duplex)
    {
        var accum = '';

        duplex.on('readable', function ()
        {
            var data = this.read();
            if (data)
            {
                accum += data.toString('hex');
            }
        });

        duplex.on('end', function ()
        {
            received.push(accum);

            ended += 1;
            assert(ended <= 10);
            if (ended === 10)
            {
                assert.deepEqual(received.sort(), sent.sort());
            }
        });
    });
}).listen(7000, function ()
{
    var mux = new BPMux(net.createConnection(7000)), i;

    function multiplex(n)
    {
        var data = crypto.randomBytes(n * 100);
        mux.multiplex().end(data);
        sent.push(data.toString('hex'));
    }

    for (i = 1; i <= 10; i += 1)
    {
        multiplex(i);
    }
});

Another Example

Multiple return pipes to the browser, multiplexed over a single Primus connection:

var PrimusDuplex = require('primus-backpressure').PrimusDuplex,
    BPMux = require('bpmux').BPMux,
    http = require('http'),
    path = require('path'),
    crypto = require('crypto'),
    stream = require('stream'),
    assert = require('assert'),
    finalhandler = require('finalhandler'),
    serve_static = require('serve-static'),
    Primus = require('primus'),
    serve = serve_static(__dirname);

http.createServer(function (req, res)
{
    serve(req, res, finalhandler(req, res));
}).listen(7500, function ()
{
    var primus = new Primus(this);

    primus.on('connection', function (spark)
    {
        var mux = new BPMux(new PrimusDuplex(spark)), ended = 0, i;

        function multiplex(n)
        {
            var buf = crypto.randomBytes(10 * 1024),
                buf_stream = new stream.PassThrough(),
                bufs = [],
                duplex = mux.multiplex({ handshake_data: Buffer.from([n]) });

            buf_stream.end(buf);
            buf_stream.pipe(duplex);

            duplex.on('readable', function ()
            {
                var data;

                while (true)
                {
                    data = this.read();
                    if (data === null)
                    {
                        break;
                    }
                    bufs.push(data);
                }
            });

            duplex.on('end', function ()
            {
                console.log('end', n);
                ended += 1;
                assert(ended <= 10);
                assert.deepEqual(Buffer.concat(bufs), buf);
            });
        }

        for (i = 0; i < 10; i += 1)
        {
            multiplex(i);
        }
    });

    console.log('Point your browser to http://localhost:7500/loader.html');
});

The HTML (loader.html) for the browser-side of this example:

<html>
  <head>
    <title>BPMux Test Runner</title>
    <script type="text/javascript" src="https://github.com/davedoesdev/bpmux/raw/master/primus/primus.js"></script>
    <script type="text/javascript" src="https://github.com/davedoesdev/bpmux/raw/master/bundle.js"></script>
    <script type="text/javascript" src="https://github.com/davedoesdev/bpmux/raw/master/loader.js"></script>
  </head>
  <body onload='doit()'>
  </body>
</html>

The browser-side code (loader.js):

function doit()
{
    var mux = new BPMux(new PrimusDuplex(new Primus({ strategy: false })));

    mux.on('handshake', function (duplex, handshake_data)
    {
        console.log("handshake", handshake_data[0]);
        duplex.pipe(duplex);

        duplex.on('end', function ()
        {
            console.log('end', handshake_data[0]);
        });
    });
}

The browser-side dependencies (bundle.js) can be produced by webpack from:

PrimusDuplex = require('primus-backpressure').PrimusDuplex;
BPMux = require('bpmux').BPMux;

Comparison

multiplex library

Multiplexing libraries which don't exert backpressure on individual streams suffer from starvation. A stream which doesn't read its data stops other streams on the multiplex getting their data.

Here's a test using the multiplex library:

// Uses https://github.com/maxogden/multiplex (npm install multiplex)
// Backpressure is exerted across the multiplex as a whole, not individual streams.
// This means a stream which doesn't read its data starves the other streams.

const fs = require('fs');
const net = require('net');
const multiplex = require('multiplex');

require('net').createServer(c => {
    c.pipe(multiplex((stream, id) => {
        stream.on('data', function(d) {
            console.log('data', id, d.length);
            if (id === '0') {
                this.pause();
            }
        });
    }));
}).listen(7000, () => {
    const plex = multiplex();
    plex.pipe(net.createConnection(7000));

    const stream1 = plex.createStream();
    const stream2 = plex.createStream();

    fs.createReadStream('/dev/urandom').pipe(stream1);
    fs.createReadStream('/dev/urandom').pipe(stream2);
});

When the first stream is paused, backpressure is applied to the second stream too, even though it hasn't been paused. If you run this example, you'll see:

$ node multiplex.js 
data 0 65536
data 1 65536

bpmux doesn't suffer from this problem since backpressure is exerted on each stream separately. Here's the same test:

// BPMux exerts backpressure on individual streams so a stream which doesn't
// read its data doesn't starve the other streams.

const fs = require('fs');
const net = require('net');
const { BPMux } = require('bpmux');

require('net').createServer(c => {
    new BPMux(c).on('handshake', stream => {
        stream.on('data', function (d) {
            console.log('data', stream._chan, d.length);
            if (stream._chan === 0) {
                this.pause();
            }
        });
    });
}).listen(7000, () => {
    const mux = new BPMux(net.createConnection(7000));

    const stream1 = mux.multiplex();
    const stream2 = mux.multiplex();

    fs.createReadStream('/dev/urandom').pipe(stream1);
    fs.createReadStream('/dev/urandom').pipe(stream2);
});

The second stream continues to receive data when the first stream is paused:

data 0 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
data 1 16384
...

HTTP/2 sessions

HTTP/2 sessions do exert backpressure on individual streams, as this test shows:

const fs = require('fs');
const http2 = require('http2');

const server = http2.createServer();
server.on('stream', (stream, headers) => {
    stream.on('data', function (d) {
        console.log('data', headers[':path'], d.length);
        if (headers[':path'] === '/stream1') {
            this.pause();
        }
    });
});
server.listen(8000);

const client = http2.connect('http://localhost:8000');

const stream1 = client.request({ ':path': '/stream1' }, { endStream: false });
const stream2 = client.request({ ':path': '/stream2' }, { endStream: false });

fs.createReadStream('/dev/urandom').pipe(stream1);
fs.createReadStream('/dev/urandom').pipe(stream2);
data /stream1 16384
data /stream2 16384
data /stream2 16348
data /stream2 35
data /stream2 16384
data /stream2 16384
data /stream2 1
data /stream2 16384
data /stream2 16366
data /stream2 18
data /stream2 16384
data /stream2 16382
data /stream2 2
data /stream2 16384
...

If you pass a pair of sessions (one client, one server) to BPMux(), they will be used for multiplexing streams, with no additional overhead. This is useful if you want to use the bpmux API.

Errors

bpmux will emit error events on multiplexed streams if their underlying (carrier) stream closes before they have closed. The error object will have one of the following messages:

carrier stream finished before duplex finished
carrier stream ended before end message received

and have a property carrier_done set to true.

As this is an error event, you must register an event listener on multiplexed streams if you don't want the Node process to exit.

The reasoning behind emitting error events on open multiplexed streams when their carrier closes is:

If you do register error event listeners, make sure you do so for streams you multiplex using multiplex() and for streams you receive using the handshake or peer_multiplex events.

BPMux objects will also re-emit any error events their carrier stream emits.

Installation

npm install bpmux

Licence

MIT

Test

Over TCP (long test):

grunt test

Over TCP (quick test):

grunt test-fast

Over Primus (using nwjs to run browser- and server-side):

grunt test-browser

The examples at the top of this page:

grunt test-examples

Code Coverage

grunt coverage

c8 results are available here.

Coveralls page is here.

Lint

grunt lint

API

new Http2Sessions(client, server)

Class for holding a pair of HTTP/2 sessions.

Pass this to BPMux() and it will use the sessions' existing support for multiplexing streams. Both client and server sessions are required because HTTP/2 push streams are unidirectional.

Parameters:

Go: TOC

BPMux(carrier, [options])

Constructor for a BPMux object which multiplexes more than one stream.Duplex over a carrier Duplex.

Parameters:

Go: TOC

BPMux.prototype.multiplex([options])

Multiplex a new stream.Duplex over the carrier.

Parameters:

Return:

{Duplex} The new Duplex which is multiplexed over the carrier. This supports back-pressure using the stream readable event and write method.

Throws:

Go: TOC | BPMux.prototype

BPMux.events.peer_multiplex(duplex)

peer_multiplex event

A BPMux object emits a peer_multiplex event when it detects a new multiplexed stream from its peer on the carrier stream.

Parameters:

Go: TOC | BPMux.events

BPMux.events.handshake(duplex, handshake_data, [delay_handshake])

handshake event

A BPMux object emits a handshake event when it receives a handshake message from its peer on the carrier stream. This can happen in two cases:

  1. The BPMux object is processing a handshake message for a new multiplexed stream the peer created and it hasn't seen before. Note the handshake event is emitted after the peer_multiplex event.
  2. Your application previously called multiplex on its BPMux object to multiplex a new stream over the carrier and now the peer has replied with a handshake message.

Parameters:

Go: TOC | BPMux.events

BPMux.events.handshake_sent(duplex, complete)

handshake_sent event

A BPMux object emits a handshake_sent event after it sends a handshake message to its peer on the carrier stream.

Parameters:

Go: TOC | BPMux.events

BPMux.events.drain()

drain event

A BPMux object emits a drain event when its carrier stream emits a drain event.

Go: TOC | BPMux.events

BPMux.events.end()

end event

A BPMux object emits a end event after the carrier stream ends (will receive no more data).

Go: TOC | BPMux.events

BPMux.events.finish()

finish event

A BPMux object emits a finish event after the carrier stream finishes (won't write any more data).

Go: TOC | BPMux.events

BPMux.events.full()

full event

A BPMux object emits a full event when it wants to add a new multiplexed stream on the carrier stream but the number of multiplexed streams is at its maximum. It will remain at maximum until a removed event is emitted.

Go: TOC | BPMux.events

BPMux.events.removed(duplex)

removed event

A BPMux object emits a removed event when a multiplexed stream has closed (finished and ended) and been removed from the list of multiplexed streams.

Parameters:

Go: TOC | BPMux.events

BPMux.events.keep_alive()

keep_alive event

A BPMux object emits a keep_alive event when it receives a keep-alive message from its peer.

Go: TOC | BPMux.events

—generated by apidox