karlseguin / websocket.zig

A websocket implementation for zig
MIT License
283 stars 25 forks source link

Large message corrupt #27

Closed clickingbuttons closed 7 months ago

clickingbuttons commented 7 months ago

Not sure exactly why, but when receiving large messages (5Kb) in my pub fn handle(self: *Self, message: websocket.Message) !void { some sort of corruption happens. I'm expecting a JSON payload that starts with [{, but instead I get these bytes in decimal:

3 2 174 64 85 30 101 127 129 253 154 73 127 144 111 213 120 224 141 77

I don't experience this issue with other websocket clients, so I believe it's some buffer issue. I'll investigate configuring a larger buffer tomorrow (maybe some 4KB limit is causing it?), and if that doesn't work, a reproduction.

Would love debugging pointers in the meantime.

clickingbuttons commented 7 months ago

After playing around a bit I realized this error only happens when I backup the Websocket server's buffer by not reading messages fast enough. I can buffer slow work on a separate thread so I'm closing this.

I'm still curious why exactly this happens. I'll become acquainted with the RFC and find some instrumentation. Do you have any recommendations?

karlseguin commented 7 months ago

This is pretty concerning to me, I'm wondering if you might be able to provide more info. You mention both clients and server, and I'm not clear on whether this is happening while you're using the client element of the library (websocket.Client) or the server element (websocket.listen).

If it's the server, are you using it directly, or through the http.zig integration?

The message.data field is only valid until the your handle message returns, so if you're holding onto any part of it, you need to dupe/own it. For example, with std.json you'd need to make sure .allocate = .alloc_always is set in the json ParseOption (this is set in some cases, but not in others, depending on which std.json api you're using, I think).

clickingbuttons commented 7 months ago

Hey, I'm experiencing this even without a slow handle so I'm glad you reopened.

To provide additional context, here's the simplest reproduction and here's the repo. Even with the slow parts commented out I see this error. I either get a response that has length 109 and starts with 23 3 3 0 124 14 or one with length 5891 and starts with 3 2 ....

Sadly wss://socket.polygon.io requires a subscription but I'm happy to debug on this issue or over Discord.

I'm also ocassionally encountering this issue:

thread 167871 panic: index out of bounds: index 16762, len 16645
/home/thesm/.local/share/zvm/0.12.0-dev.2811+3cafb9655/lib/std/crypto/tls/Client.zig:1271:68: 0x11d32a0 in finishRead (stream)
        @memcpy(c.partially_read_buffer[c.partial_ciphertext_idx..][0..saved_buf.len], saved_buf);
                                                                   ^
/home/thesm/.local/share/zvm/0.12.0-dev.2811+3cafb9655/lib/std/crypto/tls/Client.zig:1092:34: 0x11cd7b6 in readvAdvanced__anon_9344 (stream)
                return finishRead(c, frag, in, vp.total);
                                 ^
/home/thesm/.local/share/zvm/0.12.0-dev.2811+3cafb9655/lib/std/crypto/tls/Client.zig:928:38: 0x11dad26 in readvAtLeast__anon_9343 (stream)
        var amt = try c.readvAdvanced(stream, iovecs[vec_i..]);
                                     ^
/home/thesm/.local/share/zvm/0.12.0-dev.2811+3cafb9655/lib/std/crypto/tls/Client.zig:889:24: 0x11db1f6 in readAtLeast__anon_9341 (stream)
    return readvAtLeast(c, stream, &iovecs, len);
                       ^
/home/thesm/.local/share/zvm/0.12.0-dev.2811+3cafb9655/lib/std/crypto/tls/Client.zig:894:23: 0x11db2c1 in read__anon_9340 (stream)
    return readAtLeast(c, stream, buffer, 1);
                      ^
/home/thesm/.cache/zig/p/1220ce168e550f8904364acd0a72f5cafd40caa08a50ba83aac50b97ba700d7bcf20/src/client.zig:290:26: 0x11db405 in read (stream)
   return tls_client.read(self.stream, buf);
                         ^
/home/thesm/.cache/zig/p/1220ce168e550f8904364acd0a72f5cafd40caa08a50ba83aac50b97ba700d7bcf20/src/reader.zig:288:29: 0x11e1d80 in read__anon_9520 (stream)
   const n = try stream.read(buf[pos..buf_end]);
                            ^
/home/thesm/.cache/zig/p/1220ce168e550f8904364acd0a72f5cafd40caa08a50ba83aac50b97ba700d7bcf20/src/reader.zig:107:23: 0x11e29ce in readMessage__anon_9518 (stream)
    if ((try self.read(stream, data_needed)) == false) {
                      ^
/home/thesm/.cache/zig/p/1220ce168e550f8904364acd0a72f5cafd40caa08a50ba83aac50b97ba700d7bcf20/src/client.zig:141:38: 0x11e76c8 in readLoop__anon_9539 (stream)
   const message = reader.readMessage(stream) catch |err| switch (err) {
                                     ^
/home/thesm/src/polycsv/src/stream.zig:78:32: 0x11e86ae in main (stream)
    try handler.client.readLoop(&handler);
                               ^
/home/thesm/.local/share/zvm/0.12.0-dev.2811+3cafb9655/lib/std/start.zig:511:37: 0x11e93ce in main (stream)
            const result = root.main() catch |err| {
                                    ^
???:?:?: 0x779fabe5dccf in ??? (libc.so.6)
Unwind information for `libc.so.6:0x779fabe5dccf` was not available, trace may be incomplete

I receive many messages between 100 and 1000 bytes at a rate of ~11k/sec and an occasional large message of 5-10KB.

clickingbuttons commented 7 months ago

The data field does eventually include my expected message.

39��WDÎm°Dfs=[Ic��Z�&'
                                      kҰpߧ:Sԗy
                                              ��dn&g��'{�1F�xc.8=h`w>�EԴ&k2ɋ/݉��g�:21,"p":35.95,

Then after a while there's another header looking bit:

~[{"ev":"T","sym":"NEWP"," 09��WDÎm°Dfs=[Ic��Z�&'
                       kҰpߧ:Sԗy
                               ��dn&g��'{�1F�xc.8=h`w>�EԴ&k2ɋ/݉��g�:21,"p":35.95,"s":23
clickingbuttons commented 7 months ago

I made a server using this library that emulates wss://socket.polygon.io with the occasional large message:

const std = @import("std");
const websocket = @import("websocket");
const Conn = websocket.Conn;
const Message = websocket.Message;
const Handshake = websocket.Handshake;
const RndGen = std.rand.DefaultPrng;

const address = "127.0.0.1";
const port = 8080;

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer std.debug.assert(gpa.deinit() == .ok);
    const allocator = gpa.allocator();

    std.debug.print("listening on {s}:{d} \n",.{ address, port});
    try websocket.listen(Handler, allocator, {}, .{ .address = address, .port = port });
}

const Handler = struct {
    conn: *Conn,
    n_recv: usize = 0,

    pub fn init(_: Handshake, conn: *Conn, _: void) !Handler {
        return Handler{ .conn = conn };
    }

    pub fn afterInit(self: *Handler) !void {
        try self.conn.write("welcome");
    }

    pub fn handle(self: *Handler, message: Message) !void {
        _ = message;

        switch (self.n_recv) {
            0 => try self.conn.write("authed"),
            1 => {
                const trade =
                    \\{"ev":"T","sym":"HIVE","i":"1223","x":11,"p":4.08,"s":200,"t":1708453945556,"q":4216525,"z":3}
                    ;
                try self.conn.write("subscribed");
                var rnd = RndGen.init(0);

                const max_trades = 100;
                var send_buf: [(trade.len + 1) * max_trades + 2]u8 = undefined;
                var stream = std.io.fixedBufferStream(&send_buf);
                var writer = stream.writer();

                var n_sent: usize = 1;
                while (true) : (n_sent += 1) {
                    const n_trades = switch (n_sent % 10_000) {
                        0 => max_trades,
                        else => rnd.random().intRangeLessThan(usize, 1, 10),
                    };

                    stream.reset();
                    try writer.writeAll("[");
                    for (0..n_trades) |i| {
                        try writer.writeAll(trade);
                        if (i != n_trades - 1) try writer.writeAll(",");
                    }
                    try writer.writeAll("]");
                    try self.conn.write(send_buf[0..stream.pos]);
                    std.time.sleep(std.time.ns_per_s / 10_000);
                }
            },
            else => try self.conn.write("go away"),
        }
        self.n_recv += 1;
    }

    // called whenever the connection is closed, can do some cleanup in here
    pub fn close(_: *Handler) void {
        std.debug.print("closed\n", .{});
    }
};

I had to increase the max_size on the client but wasn't able to reproduce the issue. That means it's likely not a throughput issue.

karlseguin commented 7 months ago

The index out of bounds error appears to be a bug in Zig's TLS client. There's an existing issue which I've commented on with a somewhat reproducible example.

clickingbuttons commented 7 months ago

Confirmed a std TLS issue. Closing.