haxetink / tink_tcp

TCP everywhere!
MIT License
13 stars 6 forks source link

Strange thing when parsing #1

Closed kevinresol closed 7 years ago

kevinresol commented 8 years ago

I am writing a parser (for mongodb driver) that will parse the incoming data from a connection but then I hit a strange thing (bug?) that it won't consume all the available data from the source.

Suppose the incoming data is "05,00,00,00,01,05,00,00,00,02,05,00,00,00,03" (bytes in hex) Actually, such data comprises of three individual message: -05,00,00,00,01 -05,00,00,00,02 -05,00,00,00,03

For each message the first four bytes is a little endian int32 indicating the length of the message, so all three of them are 05,00,00,00 which is 5.

The remaining bytes are just the payload.

At the end of the post is a reproducible example on nodejs. The output such program is

progress: 05,00,00,00,01,05,00,00,00,02,05,00,00,00,03
read    : 05,00,00,00,01
parsed  : 05,00,00,00,01
progress: 05,00,00,00,02,05,00,00,00,03
read    : 05,00,00,00,02
parsed  : 05,00,00,00,02

but it is expected that it should parse the third message as well: (but it didn't)

progress: 05,00,00,00,03
read    : 05,00,00,00,03
parsed  : 05,00,00,00,03

I tried to use a Source of Bytes or BytesInput and in such cases all 3 messages are parsed correctly. So I am not sure if it is my problem not writing a "correct" parser or it should be a bug of the library.

package;

import haxe.ds.Option;
import haxe.io.Bytes;
import haxe.io.BytesInput;
import haxe.io.BytesBuffer;
import tink.io.StreamParser;
import tink.io.Buffer;
import tink.io.Source;
import tink.io.Worker;
import tink.tcp.Server;
import tink.tcp.Connection;
import Main.*;

using tink.CoreApi;

class Main
{
    public static function main()
    {
        var s = "05,00,00,00,01,05,00,00,00,02,05,00,00,00,03";
        var source:Source = hexToBytes(s);

        Server.bind(13579).handle(function(o) switch o {
            case Success(server): 
                server.connected.handle(function(connection) {
                    source.pipeTo(connection.sink).handle(function() {});
                });

                Connection.tryEstablish(13579 #if !nodejs ,Worker.get(), Worker.get() #end).handle(function(o) switch o {
                    case Success(d): 
                        d.source.parseStream(new Parser()).forEach(function(bytes){
                            trace('parsed  : ' + printBytes(bytes, 0, bytes.length));
                            return true;
                        });
                    case Failure(err):
                        trace(err);
                });

            case Failure(err):
                trace(err);
        });
    }

    static function hexToBytes(str:String) {
        var s = str.split(',');
        var bytes = Bytes.alloc(s.length);
        for(i in 0...s.length) bytes.set(i, Std.parseInt('0x${s[i]}'));
        return bytes;
    }

    public static function printBytes(bytes:Bytes, start:Int, len:Int) {
        return [for(i in start...start+len) StringTools.hex(bytes.get(i), 2)].join(",");
    }
}

class Parser implements StreamParser<Bytes> {

    // the length of the current message
    var length = 0;
    var result:Option<Bytes>;
    var out:BytesBuffer;

    public function new() {

    }

    public function minSize() 
        return 4;

    public function eof():Outcome<Bytes, Error> {
        trace('eof');
        return Success(null); // TODO: will this ever happen for a connection's source?
    }

    public function progress(buffer:Buffer):Outcome<Option<Bytes>, Error> {
        buffer.writeTo(this);
        return Success(result);
    }

    function writeBytes(bytes:Bytes, start:Int, len:Int) {
        trace('progress: ' + printBytes(bytes, start, len));
        if(length == 0) {
            result = None;
            length = bytes.getInt32(start);
            out = new BytesBuffer();
        }

        inline function min(a:Int, b:Int) return a > b ? b : a;

        // don't read more than needed
        var readLen = min(length - out.length, len);
        trace('read    : ' + printBytes(bytes, start, readLen));
        out.addBytes(bytes, start, readLen);
        if(out.length == length) {
            result = Some(out.getBytes());
            length = 0;
        }
        return readLen;
    }

}
kevinresol commented 8 years ago

Another issue is about tink_streams I believe. I expect the stream stop looping after parsing 3 times, but on neko the stream loops indefinitely.

progress: 05,00,00,00,01,05,00,00,00,02,05,00,00,00,03
read    : 05,00,00,00,01
parsed  : 05,00,00,00,01
progress: 05,00,00,00,02,05,00,00,00,03
read    : 05,00,00,00,02
parsed  : 05,00,00,00,02
progress: 05,00,00,00,03
read    : 05,00,00,00,03 (expected to stop right at this line)
parsed  : 05,00,00,00,03
parsed  : 05,00,00,00,03
parsed  : 05,00,00,00,03
parsed  : 05,00,00,00,03
parsed  : 05,00,00,00,03
parsed  : 05,00,00,00,03
parsed  : 05,00,00,00,03
....(indefinitely)
back2dos commented 8 years ago

This is great news (the mongodb driver). Will try to fix it later this day.

kevinresol commented 8 years ago

I hope I can finish it. The mongodb spec seems huge.

kevinresol commented 7 years ago

When working with websocket I hit this same wall again. Btw I think I can move the mongo protocol to the new protocol lib too, because the protocol part is already finish. The unfinish(able?) part is the server discovery/recovery part, which is huge.

kevinresol commented 7 years ago

Investigated a bit and found that the problem lies around how flush is repeated in a Pipe here

kevinresol commented 7 years ago

Ok, the problem is like so: Taking the example in the first post in this thread, the total incoming data has 15 bytes, which comes in a single batch.

The logic is a bit complex here and I don't have a whole picture so I am afraid of attempting for a fix.