karlseguin / http.zig

An HTTP/1.1 server for zig
MIT License
590 stars 43 forks source link

req.formData() causing corruption of the resulting key/values when escape characters are parsed (under load) #42

Closed Visytel closed 8 months ago

Visytel commented 8 months ago

Karl,

Firstly a big thank you for your Zig projects (especially HTTP & PG), since I am only starting out in Zig (first 2 weeks of coding - coming from C) I was not able to create a PR to solve this issue, however perhaps you already know how this can happen and can point me in the right direction to stabilise using req.formData().

My setup:

Debian running Locust (https://docs.locust.io/en/stable/what-is-locust.html) to hit my Zig project on a separate Debian 12 box running my Zig project (master version of Zig) using your HTTP.zig & PG.zig. I swarm in with 800 users with 10 users started per second. With a random wait time between POST tests between 1 and 50ms. So a decent test.

What I have found is that when unescaping characters is required the resulting key/values can become corrupted during the call to req.formData().

After some debugging on my side I found the problem seems to stem from the var buf = self.spare being passed into Url.unescape(allocator, buf, pair[0..sep]); and Url.unescape(allocator, buf, pair[sep + 1 ..]); then when inside the Url.unescape is processing escape characters then the resulting fd.keys and fd.values can become corrupted.

I have added some basic debugging to your HTTP.zig codebase and you can see the corruption taking place around parsing %2Fpath%2Fservicefile.lua (FYI the line numbers will not directly correspond to your HTTP commit version due to my debugging additions):

Note (1): b= self.body() = "session_id=abc_123&node_id=0&leg_id=0&service_file=%2Fpath%2Fservicefile.lua&data_source=POSTGRESQL&sql_statement=SELECT+%2A+FROM+service_data.visytel_test+ORDER+BY+visytel_test_id+ASC"

Note (2): out=.................. is based on out[0..unescaped_len]

Problem trace:

[httpz/src/request.zig:250][TID:6514]DEBUG 0.2 fd.keys[0..fd.len]: { session_id, node_id, leg_id } [httpz/src/request.zig:251][TID:6514]DEBUG 0.1 fd.values[0..fd.len]: { abc_123, 0, 0 } [httpz/src/url.zig:101][TID:6514]DEBUG HERE 2: buffer.len=32482, unescaped_len=21, input=%2Fpath%2Fservicefile.lua [httpz/src/url.zig:110][TID:6514]DEBUG HERE > out=session_id=abc_123&no [httpz/src/url.zig:114][TID:6514]DEBUG HERE 4 b=37, i=0 [httpz/src/url.zig:116][TID:6514]DEBUG HERE 5 enc=2F [httpz/src/url.zig:117][TID:6514]DEBUG HERE >> out=session_id=abc_123&no [httpz/src/url.zig:114][TID:6514]DEBUG HERE 4 b=37, i=5 [httpz/src/url.zig:116][TID:6514]DEBUG HERE 5 enc=2F [httpz/src/url.zig:117][TID:6514]DEBUG HERE >> out=/pathon_id=abc_123&no [httpz/src/url.zig:153][TID:6514]DEBUG HERE 6 out=/path/servicefile.lua [httpz/src/request.zig:254][TID:6514]DEBUG 1 value_res.value.len: 21 [httpz/src/request.zig:255][TID:6514]DEBUG 1 buf.len: 32482 [httpz/src/request.zig:256][TID:6514]DEBUG 0.3 fd.keys[0..fd.len]: { /path/serv, uade_id, leg_id } [httpz/src/request.zig:257][TID:6514]DEBUG 0.2 fd.values[0..fd.len]: { cefile., 0, 0 } [httpz/src/request.zig:259][TID:6514]DEBUG 0.4 fd.keys[0..fd.len]: { /path/serv, uade_id, leg_id } [httpz/src/request.zig:260][TID:6514]DEBUG 2 buf.len: 32461 [httpz/src/request.zig:263][TID:6514]DEBUG 1 key:value: service_file:/path/servicefile.lua [httpz/src/request.zig:264][TID:6514]DEBUG 1 fd.keys[0..fd.len]: { /path/serv, uade_id, leg_id } [httpz/src/key_value.zig:37][TID:6514]add [3]key:value service_file:/path/servicefile.lua [httpz/src/request.zig:267][TID:6514]DEBUG 2 key:value: service_file:/path/servicefile.lua [httpz/src/request.zig:269][TID:6514]DEBUG 2 fd.keys[0..fd.len]: { /path/serv, uade_id, leg_id, service_file }

Good trace:

[httpz/src/request.zig:250][TID:6512]DEBUG 0.2 fd.keys[0..fd.len]: { session_id, node_id, leg_id } [httpz/src/request.zig:251][TID:6512]DEBUG 0.1 fd.values[0..fd.len]: { abc_123, 0, 0 } [httpz/src/url.zig:101][TID:6512]DEBUG HERE 2: buffer.len=32298, unescaped_len=21, input=%2Fpath%2Fservicefile.lua [httpz/src/url.zig:110][TID:6512]DEBUG HERE > out=????????????????????? [httpz/src/url.zig:114][TID:6512]DEBUG HERE 4 b=37, i=0 [httpz/src/url.zig:116][TID:6512]DEBUG HERE 5 enc=2F [httpz/src/url.zig:117][TID:6512]DEBUG HERE >> out=????????????????????? [httpz/src/url.zig:114][TID:6512]DEBUG HERE 4 b=37, i=5 [httpz/src/url.zig:116][TID:6512]DEBUG HERE 5 enc=2F [httpz/src/url.zig:117][TID:6512]DEBUG HERE >> out=/path???????????????? [httpz/src/url.zig:153][TID:6512]DEBUG HERE 6 out=/path/servicefile.lua [httpz/src/request.zig:254][TID:6512]DEBUG 1 value_res.value.len: 21 [httpz/src/request.zig:255][TID:6512]DEBUG 1 buf.len: 32298 [httpz/src/request.zig:256][TID:6512]DEBUG 0.3 fd.keys[0..fd.len]: { session_id, node_id, leg_id } [httpz/src/request.zig:257][TID:6512]DEBUG 0.2 fd.values[0..fd.len]: { abc_123, 0, 0 } [httpz/src/request.zig:259][TID:6512]DEBUG 0.4 fd.keys[0..fd.len]: { session_id, node_id, leg_id } [httpz/src/request.zig:260][TID:6512]DEBUG 2 buf.len: 32277 [httpz/src/request.zig:263][TID:6512]DEBUG 1 key:value: service_file:/path/servicefile.lua [httpz/src/request.zig:264][TID:6512]DEBUG 1 fd.keys[0..fd.len]: { session_id, node_id, leg_id } [httpz/src/key_value.zig:37][TID:6512]add [3]key:value service_file:/path/servicefile.lua [httpz/src/request.zig:267][TID:6512]DEBUG 2 key:value: service_file:/path/servicefile.lua [httpz/src/request.zig:269][TID:6512]DEBUG 2 fd.keys[0..fd.len]: { session_id, node_id, leg_id, service_file }

I appreciate any tips to resolve this. Also I noticed the var buf = self.spare is 32K in size and I have not investigated where else this is used.

Thanks,

Andrew Keil

karlseguin commented 8 months ago

I haven't been able to reproduce the issue yet. It's pretty alarming. Does the request also have a querystring and, if so, is req.query() being called?

Is every request the same body and otherwise the same?

The debug statement in url.zig on line 101 (HERE 2), is that inside or outside of the if (buffer.len < unescaped_len) { block?

There's a pool of re-used request (and response) state objects. The Request.State has a buffer: []u8 used to read the request (the whole request). It defaults to 32 768. If your request is, say 768 bytes, then that buffer still has 32 000 free bytes, aka spare. When decoding parameters (in the querystring or the body), we re-use this space to store the decoded value. If spare isn't large enough to accommodate the decoded value, space is dynamically allocated in the request arena. (From this description, I assume HERE 2 is outside the if block, but just making sure).

Because unescaped values are always shorter than escaped values, you could overwrite the read body directly. That is, instead of unescaping in spare, you could write the data in those read 768 bytes, and just slice it correctly. But that would overwrite the original request and maybe make some debugging or advanced usage complicated/impossible.

Visytel commented 8 months ago

Firstly req.query() is not used. I do send two POST tests when testing:

(1) "session_id=abc_123&node_id=0&leg_id=0&service_file=%2Fpath%2Fservicefile.lua&data_source=POSTGRESQL&sql_statement=SELECT+%2A+FROM+service_data.visytel_test+ORDER+BY+visytel_test_id+ASC" (2) "session_id=abc_123&node_id=0&leg_id=0&service_file=%2Fpath%2Fservicefile.lua&data_source=POSTGRESQL&sql_statement=UPDATE+service_data.visytel_test+SET+test_smallint%3D99+WHERE+visytel_test_id%3D1"

Here are the main files (with my extra debugging) so you can correlate the messages. I am more than happy to add any extra debugging if you think that would help.

FYI: This problem was happening prior to me adding the debugging below.

request.zig:

fn parseFormData(self: *Request) !KeyValue {
        const b = self.body() orelse "";

        std.debug.print("[{s}:{d}][TID:{d}]DEBUG b: {s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), b});

        if (b.len == 0) {
            self.fd_read = true;
            return self.fd;
        }

        var fd = &self.fd;
        var buf = self.spare;
        const allocator = self.arena;

        var it = std.mem.splitScalar(u8, b, '&');
        while (it.next()) |pair| {

            std.debug.print("[{s}:{d}][TID:{d}]DEBUG pair: {s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), pair});

            if (std.mem.indexOfScalarPos(u8, pair, 0, '=')) |sep| {

                std.debug.print("[{s}:{d}][TID:{d}]DEBUG sep: {d}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), sep});
                //std.debug.print("[{s}:{d}][TID:{d}]DEBUG 1 buf: {s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), buf});
                std.debug.print("[{s}:{d}][TID:{d}]DEBUG 0.1 fd.keys[0..fd.len]: {s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), fd.keys[0..fd.len]});

                const key_res = try Url.unescape(allocator, buf, pair[0..sep]);
                if (key_res.buffered) {
                    std.debug.print("[{s}:{d}][TID:{d}]DEBUG 1 value_res.value.len: {d}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), key_res.value.len});                
                    buf = buf[key_res.value.len..];
                }

                //std.debug.print("[{s}:{d}][TID:{d}]DEBUG 2 buf: {s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), buf[key_res.value.len..]});

                std.debug.print("[{s}:{d}][TID:{d}]DEBUG 0.2 fd.keys[0..fd.len]: {s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), fd.keys[0..fd.len]});
                std.debug.print("[{s}:{d}][TID:{d}]DEBUG 0.1 fd.values[0..fd.len]: {s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), fd.values[0..fd.len]});
                const value_res = try Url.unescape(allocator, buf, pair[sep + 1 ..]);
                if (value_res.buffered) {
                    std.debug.print("[{s}:{d}][TID:{d}]DEBUG 1 value_res.value.len: {d}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), value_res.value.len});      
                    std.debug.print("[{s}:{d}][TID:{d}]DEBUG 1 buf.len: {d}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), buf.len});  
                    std.debug.print("[{s}:{d}][TID:{d}]DEBUG 0.3 fd.keys[0..fd.len]: {s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), fd.keys[0..fd.len]});  
                    std.debug.print("[{s}:{d}][TID:{d}]DEBUG 0.2 fd.values[0..fd.len]: {s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), fd.values[0..fd.len]});                  
                    buf = buf[value_res.value.len..];
                    std.debug.print("[{s}:{d}][TID:{d}]DEBUG 0.4 fd.keys[0..fd.len]: {s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), fd.keys[0..fd.len]});                  
                    std.debug.print("[{s}:{d}][TID:{d}]DEBUG 2 buf.len: {d}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), buf.len});                  
                }

                std.debug.print("[{s}:{d}][TID:{d}]DEBUG 1 key:value: {s}:{s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), key_res.value, value_res.value});
                std.debug.print("[{s}:{d}][TID:{d}]DEBUG 1 fd.keys[0..fd.len]: {s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), fd.keys[0..fd.len]});

                fd.add(key_res.value, value_res.value);
                std.debug.print("[{s}:{d}][TID:{d}]DEBUG 2 key:value: {s}:{s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), key_res.value, value_res.value});

                std.debug.print("[{s}:{d}][TID:{d}]DEBUG 2 fd.keys[0..fd.len]: {s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), fd.keys[0..fd.len]});
            } else {
                const key_res = try Url.unescape(allocator, buf, pair);
                if (key_res.buffered) {
                    buf = buf[key_res.value.len..];
                }
                fd.add(key_res.value, "");
                std.debug.print("[{s}:{d}][TID:{d}]DEBUG 3 fd.keys[0..fd.len]: {s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), fd.keys[0..fd.len]});
            }
        }

        self.spare = buf;
        self.fd_read = true;

        std.debug.print("[{s}:{d}][TID:{d}]DEBUG 2 b: {s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), b});

        return self.fd;
    }

url.zig:

    pub fn unescape(allocator: Allocator, buffer: []u8, input: []const u8) !UnescapeResult {
        var has_plus = false;
        var unescaped_len = input.len;

        var in_i: usize = 0;
        while (in_i < input.len) {
            const b = input[in_i];
            if (b == '%') {
                if (in_i + 2 >= input.len or !HEX_CHAR[input[in_i + 1]] or !HEX_CHAR[input[in_i + 2]]) {
                    return error.InvalidEscapeSequence;
                }
                in_i += 3;
                unescaped_len -= 2;
            } else if (b == '+') {
                has_plus = true;
                in_i += 1;
            } else {
                in_i += 1;
            }
        }

        // no encoding, and no plus. nothing to unescape
        if (unescaped_len == input.len and !has_plus) {
            std.debug.print("[{s}:{d}][TID:{d}]DEBUG HERE 1: buffer.len={d}, unescaped_len={d}, input={s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), buffer.len, unescaped_len, input});
            return .{ .value = input, .buffered = false };
        }

        var out = buffer;
        var buffered = true;
        std.debug.print("[{s}:{d}][TID:{d}]DEBUG HERE 2: buffer.len={d}, unescaped_len={d}, input={s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), buffer.len, unescaped_len, input});
        if (buffer.len < unescaped_len) {
            std.debug.print("[{s}:{d}][TID:{d}]DEBUG HERE 3\n", .{ @src().file, @src().line, std.Thread.getCurrentId()});
            out = try allocator.alloc(u8, unescaped_len);
            metrics.allocUnescape(unescaped_len);
            buffered = false;
        }

        in_i = 0;
        std.debug.print("[{s}:{d}][TID:{d}]DEBUG HERE > out={s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), out[0..unescaped_len]});
        for (0..unescaped_len) |i| {
            const b = input[in_i];
            if (b == '%') {
                std.debug.print("[{s}:{d}][TID:{d}]DEBUG HERE 4 b={d}, i={d}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), b, i});
                const enc = input[in_i + 1 .. in_i + 3];
                std.debug.print("[{s}:{d}][TID:{d}]DEBUG HERE 5 enc={s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), enc});
                std.debug.print("[{s}:{d}][TID:{d}]DEBUG HERE >> out={s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), out[0..unescaped_len]});
                out[i] = switch (@as(u16, @bitCast(enc[0..2].*))) {
                    ENC_20 => ' ',
                    ENC_21 => '!',
                    ENC_22 => '"',
                    ENC_23 => '#',
                    ENC_24 => '$',
                    ENC_25 => '%',
                    ENC_26 => '&',
                    ENC_27 => '\'',
                    ENC_28 => '(',
                    ENC_29 => ')',
                    ENC_2A => '*',
                    ENC_2B => '+',
                    ENC_2C => ',',
                    ENC_2F => '/',
                    ENC_3A => ':',
                    ENC_3B => ';',
                    ENC_3D => '=',
                    ENC_3F => '?',
                    ENC_40 => '@',
                    ENC_5B => '[',
                    ENC_5D => ']',
                    else => HEX_DECODE[enc[0]] << 4 | HEX_DECODE[enc[1]],
                };
                //std.debug.print("[{s}:{d}][TID:{d}]DEBUG HERE >>> out={s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), out[0..unescaped_len]});
                in_i += 3;
            } else if (b == '+') {
                out[i] = ' ';
                in_i += 1;
            } else {
                out[i] = b;
                in_i += 1;
            }
            //std.debug.print("[{s}:{d}][TID:{d}]DEBUG HERE >>>> out={s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), out[0..unescaped_len]});
        }
        std.debug.print("[{s}:{d}][TID:{d}]DEBUG HERE 6 out={s}\n", .{ @src().file, @src().line, std.Thread.getCurrentId(), out[0..unescaped_len]});
        return .{ .value = out[0..unescaped_len], .buffered = buffered };
    }
Visytel commented 8 months ago

One more thing I have simplified my tests to only run the one POST (plus removed my authentication checks and code) and it still happens:

(1) "session_id=abc_123&node_id=0&leg_id=0&service_file=%2Fpath%2Fservicefile.lua&data_source=POSTGRESQL&sql_statement=SELECT+%2A+FROM+service_data.visytel_test+ORDER+BY+visytel_test_id+ASC"

For example:

[httpz/src/request.zig:250][TID:7108]DEBUG 0.2 fd.keys[0..fd.len]: { /path/serv, uade_id, leg_id, service_file, data_source }
[httpz/src/request.zig:251][TID:7108]DEBUG 0.1 fd.values[0..fd.len]: { cefile., 0, 0, /path/servicefile.lua, POSTGRESQL }
[httpz/src/url.zig:101][TID:7108]DEBUG HERE 2: buffer.len=32461, unescaped_len=68, input=SELECT+%2A+FROM+service_data.visytel_test+ORDER+BY+visytel_test_id+ASC
[httpz/src/url.zig:110][TID:7108]DEBUG HERE > out=de_id=0&leg_id=0&service_file=%2Fpath%2Fservicefile.lua&data_source=
[httpz/src/url.zig:114][TID:7108]DEBUG HERE 4 b=37, i=7
[httpz/src/url.zig:116][TID:7108]DEBUG HERE 5 enc=2A
[httpz/src/url.zig:117][TID:7108]DEBUG HERE >> out=SELECT &leg_id=0&service_file=%2Fpath%2Fservicefile.lua&data_source=
[httpz/src/url.zig:153][TID:7108]DEBUG HERE 6 out=SELECT * FROM service_data.visytel_test ORDER BY visytel_test_id ASC
[httpz/src/request.zig:254][TID:7108]DEBUG 1 value_res.value.len: 68
[httpz/src/request.zig:255][TID:7108]DEBUG 1 buf.len: 32461
[httpz/src/request.zig:256][TID:7108]DEBUG 0.3 fd.keys[0..fd.len]: { /path/serv, uaSELEC,  FROM , vice_data.vi, _test_id AS }
[httpz/src/request.zig:257][TID:7108]DEBUG 0.2 fd.values[0..fd.len]: { cefile.,  , e, /path/servicefile.lua, POSTGRESQL }
[httpz/src/request.zig:259][TID:7108]DEBUG 0.4 fd.keys[0..fd.len]: { /path/serv, uaSELEC,  FROM , vice_data.vi, _test_id AS }
[httpz/src/request.zig:260][TID:7108]DEBUG 2 buf.len: 32393
[httpz/src/request.zig:263][TID:7108]DEBUG 1 key:value: sql_statement:SELECT * FROM service_data.visytel_test ORDER BY visytel_test_id ASC
[httpz/src/request.zig:264][TID:7108]DEBUG 1 fd.keys[0..fd.len]: { /path/serv, uaSELEC,  FROM , vice_data.vi, _test_id AS }
[httpz/src/key_value.zig:37][TID:7108]add [5]key:value sql_statement:SELECT * FROM service_data.visytel_test ORDER BY visytel_test_id ASC
[httpz/src/request.zig:267][TID:7108]DEBUG 2 key:value: sql_statement:SELECT * FROM service_data.visytel_test ORDER BY visytel_test_id ASC
[httpz/src/request.zig:269][TID:7108]DEBUG 2 fd.keys[0..fd.len]: { /path/serv, uaSELEC,  FROM , vice_data.vi, _test_id AS, sql_statement }
[httpz/src/request.zig:283][TID:7108]DEBUG 2 b: /path/servicefile.luaSELECT * FROM service_data.visytel_test ORDER BY visytel_test_id ASCPOSTGRESQL&sql_statement=SELECT+%2A+FROM+service_data.visytel_test+ORDER+BY+visytel_test_id+ASC
[src/global.zig:335][TID:7108]name:field /path/serv:cefile.
[src/global.zig:335][TID:7108]name:field uaSELEC:
[src/global.zig:335][TID:7108]name:field  FROM :e
[src/global.zig:335][TID:7108]name:field vice_data.vi:/path/servicefile.lua
[src/global.zig:335][TID:7108]name:field _test_id AS:POSTGRESQL
[src/global.zig:335][TID:7108]name:field sql_statement:SELECT * FROM service_data.visytel_test ORDER BY visytel_test_id ASC
Visytel commented 8 months ago

Extra information that may help:

This was my setup for testing .max_form_count = 6. I just tried with .max_form_count = 250 and the problem still happens.

        var server = httpz.ServerCtx(*GlobalContext, *GlobalContext).init(allocator, .{ .address = config.listen_ip, .port = config.listen_port, .request = .{ .max_form_count = 6 } }, &ctx) catch |err| { ....};
                defer server.deinit();
Visytel commented 8 months ago

Last message for my day (I am based on Melbourne, Australia). I have managed to reduce my test app to only use your HTTPZ.zig and removed PG.ZIG and all my extra code so I can focus on only testing the req.formData() and the problem still happens.

I hope this helps you ...

src/main.zig:

const std = @import("std");
const global = @import("global.zig");

const Allocator = std.mem.Allocator;

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();

    const t1 = try std.Thread.spawn(.{}, global.start, .{allocator});

    std.debug.print("[{s}]:{d}][TID:{d}]test started\n", .{ @src().file, @src().line, std.Thread.getCurrentId()});

    t1.join();
}

src/global.zig:

const std = @import("std");
const httpz = @import("httpz");
const Allocator = std.mem.Allocator;

// Our global state
const GlobalContext = struct {
    allocator: Allocator,
    thread_id: u32,

    pub fn sql_fetch(self: *GlobalContext, req: *httpz.Request, res: *httpz.Response) !void {
        // status code 200 is implicit.

        res.content_type = .TEXT;

        std.debug.print("[{s}:{d}][TID:{d}]raw parameters: {?s}\n", .{ @src().file, @src().line, self.thread_id, req.body()});

        const fd = req.formData() catch |err| {
            std.debug.print("[{s}:{d}][TID:{d}]ERROR test failed to parse form data (error: {})\n", .{@src().file, @src().line, std.Thread.getCurrentId(), err });
            return invalid(res);
        };
        for (fd.keys[0..fd.len], fd.values[0..fd.len]) |name, field| {
            std.debug.print("[{s}:{d}][TID:{d}]name:field {s}:{s}\n", .{@src().file, @src().line, std.Thread.getCurrentId(), name, field });
        }
        return invalid(res);
    }
};

// Started in main.zig
pub fn start(allocator: Allocator) !void {

    // Configuration file should live alongside the executable
    const config_file = "test.json";

    // EXAMPLE test.json:

    //{
    //    "listen_ip": "0.0.0.0",
    //    "listen_port": 8008
    //}

    // Config struct used to parse the JSON all fields must exist for the configuration to parse successfully
    const Config = struct {
        listen_ip: []const u8,
        listen_port: ?u16,      
    };

    // Variable to store the parsed configuration values
    var config = Config {
        .listen_ip = undefined,
        .listen_port = undefined,   
    };

    const MAX_CONFIG_SIZE = 512;
    const exe_path = try std.fs.selfExePathAlloc(allocator);
    defer allocator.free(exe_path);
    const opt_exe_dir = std.fs.path.dirname(exe_path);

    if (opt_exe_dir) |exe_dir| {
        var paths = [_][]const u8{ exe_dir, config_file };
        const file_path = try std.fs.path.join(allocator, &paths);
        defer allocator.free(file_path);

        const file_contents = std.fs.cwd().readFileAlloc(allocator, file_path, MAX_CONFIG_SIZE) catch |err| {
            std.debug.print("[{s}:{d}][TID:{d}]ERROR test failed to load the configuration file into memory {s} (error: {})\n", .{@src().file, @src().line, std.Thread.getCurrentId(), file_path, err });
            std.debug.print("[{s}:{d}][TID:{d}]test aborted with exit code: 1\n", .{@src().file, @src().line, std.Thread.getCurrentId()});      
            std.process.exit(1);
        };
        defer allocator.free(file_contents);
        const parsed_config = std.json.parseFromSlice(Config, allocator, file_contents, .{.allocate = .alloc_always}) catch |err| {
            std.debug.print("[{s}:{d}][TID:{d}]ERROR test failed to parse the JSON configuration inside {s} (error: {})\n", .{@src().file, @src().line, std.Thread.getCurrentId(), file_path, err });
            std.debug.print("[{s}:{d}][TID:{d}]test aborted with exit code: 1\n", .{@src().file, @src().line, std.Thread.getCurrentId()});      
            std.process.exit(1);
        };  
        defer parsed_config.deinit();   
        config = parsed_config.value;

        var ctx = GlobalContext{
            .allocator = allocator,
            .thread_id = std.Thread.getCurrentId(),
        };

        var server = httpz.ServerCtx(*GlobalContext, *GlobalContext).init(allocator, .{ .address = config.listen_ip, .port = config.listen_port, .request = .{ .max_form_count = 6 } }, &ctx) catch |err| {
            std.debug.print("[{s}:{d}][TID:{d}]ERROR test failed to setup a server on {s} port {?d}: {}\n", .{@src().file, @src().line, std.Thread.getCurrentId(), config.listen_ip, config.listen_port, err});
            std.debug.print("[{s}:{d}][TID:{d}]test aborted with exit code: 1\n", .{@src().file, @src().line, std.Thread.getCurrentId()});  
            std.process.exit(1);
        };
        defer server.deinit();
        var router = server.router();
        router.post("/sql_fetch/", GlobalContext.sql_fetch);
        return server.listen() catch |err| {
            std.debug.print("[{s}:{d}][TID:{d}]ERROR test failed to setup a server on {s} port {?d}: {}\n", .{@src().file, @src().line, std.Thread.getCurrentId(), config.listen_ip, config.listen_port, err});
            std.debug.print("[{s}:{d}][TID:{d}]test aborted with exit code: 1\n", .{@src().file, @src().line, std.Thread.getCurrentId()});      
            std.process.exit(1);
        };      
    }
}

fn notAuthorized(res: *httpz.Response) void {
    res.status = 401;
    res.body = "{AUTHENTICATION FAILED}";
}

fn invalid(res: *httpz.Response) void {
    res.status = 400;
    res.body = "{INVALID}";
}

Locust.io test app (flood_test.py):

import time
from locust import HttpUser, task, between

# Run: ulimit -n 10000 prior to starting: locust -f flood_test.py
# Then goto webpage: http://x.x.x.x:8089
# Tested 800 users with spawn rate of 10 to http://y.y.y.y:8008 (where the zig test app is running)

class sqlFetchUser(HttpUser):
    wait_time = between(1, 50)

    @task
    def sql_fetch(self):
        d = {"session_id":"abc_123","node_id":0,"leg_id":0,"service_file":"/path/servicefile.lua","data_source":"POSTGRESQL","sql_statement":"SELECT * FROM service_data.visytel_test ORDER BY visytel_test_id ASC"}
        self.client.post("/sql_fetch/", data=d)

build.zig:

const std = @import("std");

// Although this function looks imperative, note that its job is to
// declaratively construct a build graph that will be executed by an external
// runner.
pub fn build(b: *std.Build) void {

    // Standard target options allows the person running `zig build` to choose
    // what target to build for. Here we do not override the defaults, which
    // means any target is allowed, and the default is native. Other options
    // for restricting supported target set are available.
    const target = b.standardTargetOptions(.{});

    // Standard optimization options allow the person running `zig build` to select
    // between Debug, ReleaseSafe, ReleaseFast, and ReleaseSmall. Here we do not
    // set a preferred release mode, allowing the user to decide how to optimize.
    const optimize = b.standardOptimizeOption(.{});

    const dep_opts = .{ .target = target, .optimize = optimize };
    const metrics_module = b.dependency("metrics", dep_opts).module("metrics");
    const websocket_module = b.dependency("websocket", dep_opts).module("websocket");

    const httpz_module = b.addModule("httpz", .{
        .root_source_file = .{ .path = "httpz/src/httpz.zig" },
        .imports = &.{
            .{ .name = "metrics", .module = metrics_module },
            .{ .name = "websocket", .module = websocket_module }
        },
    }); 

    const exe = b.addExecutable(.{
        .name = "test",
        .root_source_file = .{ .path = "src/main.zig" },
        .target = target,
        .optimize = optimize,
    });

    exe.root_module.addImport("httpz", httpz_module);
    exe.root_module.addImport("metrics", metrics_module);
    exe.root_module.addImport("websocket", websocket_module);

    // This declares intent for the executable to be installed into the
    // standard location when the user invokes the "install" step (the default
    // step when running `zig build`).
    b.installArtifact(exe);

    // This *creates* a Run step in the build graph, to be executed when another
    // step is evaluated that depends on it. The next line below will establish
    // such a dependency.
    const run_cmd = b.addRunArtifact(exe);

    // By making the run step depend on the install step, it will be run from the
    // installation directory rather than directly from within the cache directory.
    // This is not necessary, however, if the application depends on other installed
    // files, this ensures they will be present and in the expected location.
    run_cmd.step.dependOn(b.getInstallStep());

    // This allows the user to pass arguments to the application in the build
    // command itself, like this: `zig build run -- arg1 arg2 etc`
    if (b.args) |args| {
        run_cmd.addArgs(args);
    }

    // This creates a build step. It will be visible in the `zig build --help` menu,
    // and can be selected like this: `zig build run`
    // This will evaluate the `run` step rather than the default, which is "install".
    const run_step = b.step("run", "Run the app");
    run_step.dependOn(&run_cmd.step);

    const exe_unit_tests = b.addTest(.{
        .root_source_file = .{ .path = "src/main.zig" },
        .target = target,
        .optimize = optimize,
    });

    const run_exe_unit_tests = b.addRunArtifact(exe_unit_tests);

    // Similar to creating the run step earlier, this exposes a `test` step to
    // the `zig build --help` menu, providing a way for the user to request
    // running the unit tests.
    const test_step = b.step("test", "Run unit tests");
    test_step.dependOn(&run_exe_unit_tests.step);
}

build.zig.zon:

.{
    .name = "test",
    // This is a [Semantic Version](https://semver.org/).
    // In a future version of Zig it will be used for package deduplication.
    .version = "1.0.0",

    // This field is optional.
    // This is currently advisory only; Zig does not yet do anything
    // with this value.
    //.minimum_zig_version = "0.11.0",

    // This field is optional.
    // Each dependency must either provide a `url` and `hash`, or a `path`.
    // `zig build --fetch` can be used to fetch all dependencies of a package, recursively.
    // Once all dependencies are fetched, `zig build` no longer requires
    // internet connectivity.
    .dependencies = .{
        // See `zig fetch --save <url>` for a command-line interface for adding dependencies.
        //.example = .{
        //    // When updating this field to a new URL, be sure to delete the corresponding
        //    // `hash`, otherwise you are communicating that you expect to find the old hash at
        //    // the new URL.
        //    .url = "https://example.com/foo.tar.gz",
        //
        //    // This is computed from the file contents of the directory of files that is
        //    // obtained after fetching `url` and applying the inclusion rules given by
        //    // `paths`.
        //    //
        //    // This field is the source of truth; packages do not come from a `url`; they
        //    // come from a `hash`. `url` is just one of many possible mirrors for how to
        //    // obtain a package matching this `hash`.
        //    //
        //    // Uses the [multihash](https://multiformats.io/multihash/) format.
        //    .hash = "...",
        //
        //    // When this is provided, the package is found in a directory relative to the
        //    // build root. In this case the package's hash is irrelevant and therefore not
        //    // computed. This field and `url` are mutually exclusive.
        //    .path = "foo",
        //},
        //.httpz = .{
        //    .url = "git+https://github.com/karlseguin/http.zig#master",
        //    .hash = "12200dc73b9b5ae69e551be2a66fca705296accfb4e0d926f8b3ffef1166f0305773",
        //},
        .metrics = .{
          .url = "https://github.com/karlseguin/metrics.zig/archive/dcdffa672acc62e543048920ce02f0f8d1038320.tar.gz",
          .hash = "1220554c5571b775273097cc28db1c67e2bc765d37a40ca6d876790b78c9c68dda4f"
        },
        .websocket = .{
          .url = "https://github.com/karlseguin/websocket.zig/archive/1f2c4a56c642dab52fe12cdda1bd3f56865d1f86.tar.gz",
          .hash = "1220ce168e550f8904364acd0a72f5cafd40caa08a50ba83aac50b97ba700d7bcf20"
        },      
    },

    // Specifies the set of files and directories that are included in this package.
    // Only files and directories listed here are included in the `hash` that
    // is computed for this package.
    // Paths are relative to the build root. Use the empty string (`""`) to refer to
    // the build root itself.
    // A directory listed here means that all files within, recursively, are included.
    .paths = .{
        // This makes *all* files, recursively, included in this package. It is generally
        // better to explicitly list the files and directories instead, to insure that
        // fetching from tarballs, file system paths, and version control all result
        // in the same contents hash.
        "",
        // For example...
        //"build.zig",
        //"build.zig.zon",
        //"src",
        //"LICENSE",
        //"README.md",
    },
}
karlseguin commented 8 months ago

Thanks for all that. Can reproduce it. I'll update once I know more.

karlseguin commented 8 months ago

Spare was pointing to the wrong place so unescaping was overwriting part of the original request...which some parts (in this case, parts of the body that don't need escaping.

If the request is stored in buf:

'P', 'O', 'S', 'T, ' ', '/' ....
...
'C', 'o', 'n', 't', 'e', 'n', 't', '-', 'L', 'e', 'n', 'g', 't', 'h', ':', ' ', '2', '6', '\r', '\n',
'\r', '\n'
's', 'e', 's', 's', 'i', 'o', 'n', '_', 'i', 'd', '=', '1', '2', '3', '&', 'p', 'a', 't', 'h', '=', '%', '2', 'F', 't', 'm', 'p',  
*SPARE*

Then what should happen when we decode the body is that we end up with the last value,%2Ftmp being decoded in the space space of buf (since we have spare room), so it would look like:

'P', 'O', 'S', 'T, ' ', '/' ....
...
'C', 'o', 'n', 't', 'e', 'n', 't', '-', 'L', 'e', 'n', 'g', 't', 'h', ':', ' ', '2', '6', '\r', '\n',
'\r', '\n'
's', 'e', 's', 's', 'i', 'o', 'n', '_', 'i', 'd', '=', '1', '2', '3', '&', 'p', 'a', 't', 'h', '=', '%', '2', 'F', 't', 'm', 'p',
'/', 't','m','p'
*SPARE*

And then the key and values of fd can reference parts of buf. keys[0], values[0] and keys[1] all point to the original request, since they don't need unescaping. values[1] points to the unescaped value that we placed into spare:

'P', 'O', 'S', 'T, ' ', '/' ....
...
'C', 'o', 'n', 't', 'e', 'n', 't', '-', 'L', 'e', 'n', 'g', 't', 'h', ':', ' ', '2', '6', '\r', '\n',
'\r', '\n'

's', 'e', 's', 's', 'i', 'o', 'n', '_', 'i', 'd', '=', '1', '2', '3', '&', 'p', 'a', 't', 'h', '=', '%', '2', 'F', 't', 'm', 'p',
^keys[0]                                               ^values[0]          ^keys[1]
'/', 't','m','p'
^values[1]

*SPARE*

It's obviously important that spare is properly set. In this case, it wasn't, so what was happening is that the value was being unescaped into the original request, with the end result looking like:

'P', 'O', 'S', 'T, ' ', '/' ....
...
'C', 'o', 'n', 't', 'e', 'n', 't', '-', 'L', 'e', 'n', 'g', 't', 'h', ':', ' ', '2', '6', '\r', '\n',
'\r', '\n'

'/', 't', 'm', 'p', 'i', 'o', 'n', '_', 'i', 'd', '=', '1', '2', '3', '&', 'p', 'a', 't', 'h', '=', '%', '2', 'F', 't', 'm', 'p',
^keys[0]  & values[1]                                  ^values[0]          ^keys[1]

*SPARE*

This issue only impacts URL decoding of the body (which is a relatively new feature) and only happens if the entire body isn't read in a single shot, which can certainly happen. I'm surprised this happened with locust but not other benchmarking tools. I know nothing about locust, but I wonder if they are intentionally trying to break up the packets to better simulate real-world patterns.

It's possible that, behind a reverse proxy, this issue could be harder to reproduce. Reverse proxies often buffer the entire request, and thus it's more likely that http.zig's socket buffer would contain the entire request.

Visytel commented 8 months ago

Karl,

I must thank you for the quick turn around and clear feedback on this issue. I have rerun the tests at my end and this is now fixed.

As for your question around Locust, to be honest I only know the basics around this since I used it to test a Python (FASTAPI) implementation to find any issues so I thought it would be good to run the same tests against my new Zig version (using your HTTP & PG modules). It is also nice to see graphs of performance. FYI: The performance is excellent vs. FASTAPI (Python) - which I expected.

All the best,

Andrew