Closed ashisherc closed 4 years ago
Have you tried using a decoder in stream mode? It should be usable as a Transform stream from bytes to objects. If that won't work for you, I can add an option to decodeFirst
, I suppose.
Yes I used the decider in the stream mode.
readable.pipe(d)
Heres what happens,
if the readable has 2 valid cbor encoded msgs in bytes,
data event gets triggered 2 times with correct data.
If the readable stream has 2 valid cbor encodes msges and half set of bytes of the incoming msg through network, eg. In pipelining protocol communication
The data event is not triggered and instead error.
2 data event triggered with consumed bytes so as I can maintain the stream by removing used msg chunks
I am trying to implement a pipelining protocol which has headers that I need to read and accordingly decode the msg by grouping multiple packets by cbor.
Assuming the protocol looks like this:
header cbor footer
header cbor footer
and that you're stripping off the first header, then passing bytes into the decoder. That means the footer and next header are not valid cbor and can't be decoded. Try writing another transform stream called Strip
that strips the headers and footers, then pipe the output into a cbor Decoder
:
socket -> [bytes] -> Strip -> [bytes] -> cbor.Decoder -> [objects]
If the header and footer are constant-size, this should be pretty easy. Something like this, if the header is consistently four bytes with a network-order length and there is no footer:
const {Transform} = require('stream')
const START = Symbol('START')
const BODY = Symbol('BODY')
class Strip extends Transform {
constructor(...opts) {
super(...opts)
this.state = START
this.bufs = []
this.count = 0
this.left = Infinity
}
eat(size) {
const all = Buffer.concat(this.bufs)
const ret = Buffer.from(all, 0, size)
this.count -= ret.length
this.bufs = this.count ? [Buffer.from(all, size, Infinity)] : []
return ret
}
_transform(chunk, encoding, callback) {
this.bufs.push(chunk)
this.count += chunk.length
while (this.count > 0) {
switch (this.state) {
case START:
if (this.count >= 4) {
this.left = this.eat(4).readUInt32BE()
this.state = BODY
} else {
return callback()
}
break
case BODY:
const ret = this.eat(this.left)
this.push(ret)
this.left -= ret.length
if (this.left == 0) {
this.state = START
} else {
return callback()
}
break
}
callback()
}
}
}
Thank you for the detailed response, I was not piping on the socket, I was listening on data event on the socket and creating a list of all messages using BufferList, then piping it to the decoder.
I had to some modification here, not sure what could be wrong but,
const ret = Buffer.from(all, 0, size) This returned the whole all buffer, hence I replaced this with all.slice(0, size);
this.bufs = this.count ? [Buffer.from(all, size, Infinity)] : [] and this with this.bufs = this.count ? [all.slice(size, Infinity)] : []
the header size was 8 for me, hence I have modified that and the length of the body was in byte 6-8, I have done changes accordingly.
I tested and worked fine for a single msg in a single packet. Before I could test further, I am receiving below error on the first message itself after getting the decoded result.
Error [ERR_MULTIPLE_CALLBACK]: Callback called multiple times
Stack trace is pointing to this line
while(){
switch(){}
> callback()
}
Ok, it was a typo, callback() should be outside the while loop.
Going ahead, I tried to decode a message that was divided into multiple packets. Like below,
"msg of size 648566"
The sender divides the msg into smaller packets of max 12296 bytes and remaining size into another packet, which comes down to near 53 packets with every packet a header with the size of the payload which is 12288 (+8 size of the header)
To decode these packets, I have to remove every header from the packets and put together the remaining 12288 bytes from all packets, which can then be decoded by cbor.
Now the socket receives the messages with some 2 or 3 complete or the half set of packets bytes combined, the transform on the Strip transformer is called that many times (somewhere around 11 messages consisting of all 53 packets), this causes decoder receiving that many messages as separate streams piped from the socket, and hence doesn't work as expected.
socket.pipe(strip).pipe(d)
Whereas I want to be able to maintain a queue of the messages (header stripped) until they are successfully decoded, and therefore moving ahead with the next msg of packets.
I can use the same Strip class for the this use case, if I somehow get to know if the Decoder was able to decode the msg and did not result in error.
I implemented a temp store to put all the bytes, processing each of them until they can be successfully decoded without using pipe and stream interface. If the decoding is successful, clear the temp store and move on to the next msg.
This is the max I could implement to talk to the server using it's protocol
I thought of another way to do this:
function getFirstCBOR(buf, opts) {
return new Promise((resolve, reject) => {
const dec = new cbor.Decoder(opts)
dec.once('data', data => {
const ret = {
data,
leftover: dec.bs.read()
}
dec.close()
resolve(ret)
})
dec.on('error', reject)
dec.end(buf)
})
}
getFirstCBOR(Buffer.alloc(12)).then(console.log)
This prints:
{ data: 0, leftover: <Buffer 00 00 00 00 00 00 00 00 00 00 00> }
Lots of extra copies, but better than parsing a bunch of times, I bet.
I still need to try hitting getFirstCBOR multiple times, as I never know when the last packet is received. The protocol packets do not contain the start and end packet information.
When processing a byte stream, it will be helpful to return the result with bytes consumed count or leftover bytes count. This can be used when processing continuous flow of byte stream with many messages in smaller chunks during a networking application.
Currently, when decoding a stream it successfully decodes all possible messages in the byte stream. If there are any extra chunks present in the stream, eg. socket is receiving packets continuously. The decoder will result in an error.