kawanet / msgpack-lite

Fast Pure JavaScript MessagePack Encoder and Decoder / msgpack.org[JavaScript]
https://www.npmjs.com/package/msgpack-lite
MIT License
987 stars 127 forks source link

encode stream does not proactively send data to its output #80

Open josephg opened 6 years ago

josephg commented 6 years ago

If you create an encode stream and pipe it somewhere (for example, the network), the data you send to the stream won't be forwarded automatically. It only seems to be sent when the server end()'s the writable stream. This makes the transform stream basically unusable for streaming realtime data.

There's a workaround of calling stream.encoder.flush(), but that seems brittle.

josephg commented 6 years ago

Example:

const msgpack = require("msgpack-lite")
const net = require('net')

net.createServer(socket => {
  const encodeStream = msgpack.createEncodeStream()
  encodeStream.pipe(socket)

  // Send 'hi' to the socket every second
  setInterval(() => encodeStream.write('hi'), 1000)
}).listen(3300)

console.log('TCP server listening on port 3300')

This server should send a msgpack message containing 'hi' to each client every second. Instead the stream internally buffers the small messages and nothing gets sent to the network socket.

$ nc localhost 3300
   # No output!?

The problem is avoided by calling this.encoder.flush(); inside the _transform method of encode-stream.js. It seems sort of inelegant though.

rjeczalik commented 6 years ago

I bumped into this issue as well, as a workaround I'm calling _flush() each write on the encode-stream.

Would extending createEncodeStream with optional options argument make sense? The API would look like:

net.createServer(socket => {
  const encodeStream = msgpack.createEncodeStream({autoflush: true})
  encodeStream.pipe(socket)

  // Send 'hi' to the socket every second
  setInterval(() => encodeStream.write('hi'), 1000)
}).listen(3300)

This would make write() flush the data each call to it, without user code needing to call methods it shouldn't be calling.

josephg commented 6 years ago

Yeah that'd do the trick!

kawanet commented 6 years ago

Right.

msgpack.createEncodeStream() instance does not manage Stream buffer for each item. One item might be splitted for multiple chunks encoded or buffered for while. Multiple items might be joined for a single chunk.

Using it with fs Stream would work great because memory copy operations reduced. Using it with net Stream may not cause trouble in some cases when chunks splitted.

I rather use the simple msgpack.encode() interface for WebSocket messaging applications.

rjeczalik commented 6 years ago

@kawanet WebSocket is message-oriented, so msgpack.encode() will work. How to handle streaming msgpack e.g. over raw tcp socket? Like this simple echo server:

const echo = (value, enc) => {
  enc.write(value)
  enc._flush() // required, otherwise client won't get the the response
}

net.createServer(socket => {
  const enc = msgpack.createEncodeStream()
  const dec = msgpack.createDecodeStream()

  enc.pipe(socket)
  socket.pipe(dec)

  dec.on('data', value => echo(value, enc))
}).listen(3300)