Cloudef / zig-aio

io_uring like asynchronous API and coroutine powered IO tasks for zig
https://cloudef.github.io/zig-aio/
MIT License
175 stars 6 forks source link

Schedulers for HTTP Server on Multiple Threads. #16

Open bhansconnect opened 1 month ago

bhansconnect commented 1 month ago

I'm guessing I'm fundamentally doing this wrong/in a poor form. Currently I am just trying to get a simple http server built on top of sockets and zig-aio. The performance tends to be quite sporadic. Overall, it is really poor.

On top of that, multiple threads look to make the performance worse. Or at a minimum don't really change the perf.

I have an 8 core machine, so I leave half of the cores for the server and half for a benchmarking tool. I have been benchmarking this super slim server impl with wrk: wrk -t 4 -c 32 -d 10 http://127.0.0.1:8000. I also use -c 512 to do a high connection test.

All tests on macos.

Overall, I'm mostly curious if I am using the library fundamentally wrong? Do you have any general suggestions on the most performant way to do something like this? I understand that my current impl will be missing tons of advanced scheduler features (like a work stealing queue), but given the simple requests and roughly even distributions of connections, I would expect that not to be a big deal.

I get the feeling that the issue I am hitting are fundamental to even the single threaded case. The only time I saw really good perf numbers was when running the handlers via a coro.ThreadPool. But the good perf would fall apart when the handlers block. Cause the thread pool would block fully instead of yielding to the scheduler.

Thanks for your work on this library, I totally understand if something like this is out of scope currently. I'm personally hoping that I am doing something silly in the code below and fixing it will give crazy perf boosts.

const std = @import("std");
const aio = @import("aio");
const coro = @import("coro");
const log = std.log.scoped(.coro_aio);

pub const aio_options: aio.Options = .{
    .debug = false, // set to true to enable debug logs
};

pub const coro_options: coro.Options = .{
    .debug = false, // set to true to enable debug logs
};

pub const std_options: std.Options = .{
    .log_level = .err,
};

pub fn main() !void {
    var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{};
    defer _ = gpa.deinit();

    var socket: std.posix.socket_t = undefined;
    try coro.io.single(aio.Socket{
        .domain = std.posix.AF.INET,
        .flags = std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC,
        .protocol = std.posix.IPPROTO.TCP,
        .out_socket = &socket,
    });

    const address = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, 8000);
    try std.posix.setsockopt(socket, std.posix.SOL.SOCKET, std.posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    if (@hasDecl(std.posix.SO, "REUSEPORT")) {
        try std.posix.setsockopt(socket, std.posix.SOL.SOCKET, std.posix.SO.REUSEPORT, &std.mem.toBytes(@as(c_int, 1)));
    }
    try std.posix.bind(socket, &address.any, address.getOsSockLen());
    try std.posix.listen(socket, 128);

    var threads = try gpa.allocator().alloc(std.Thread, 4);
    for (0..threads.len) |i| {
        threads[i] = try std.Thread.spawn(.{}, server_thread, .{ gpa.allocator(), socket, i });
    }
    for (0..threads.len) |i| {
        threads[i].join();
    }
}

fn server_thread(allocator: std.mem.Allocator, socket: std.posix.socket_t, thread_id: usize) !void {
    log.info("Launching Server Thread {}\n", .{thread_id});
    var scheduler = try coro.Scheduler.init(allocator, .{});
    defer scheduler.deinit();

    var tasks = std.ArrayList(HandlerTask).init(allocator);
    var have_tasks: coro.ResetEvent = .{};
    _ = try scheduler.spawn(accept_requests, .{ &scheduler, socket, &tasks, &have_tasks, thread_id }, .{});
    _ = try scheduler.spawn(clean_up_tasks, .{ &tasks, &have_tasks }, .{});

    try scheduler.run(.wait);
}

fn accept_requests(scheduler: *coro.Scheduler, socket: std.posix.socket_t, tasks: *std.ArrayList(HandlerTask), have_tasks: *coro.ResetEvent, thread_id: usize) !void {
    while (true) {
        log.info("Loop accept\n", .{});
        var client_socket: std.posix.socket_t = undefined;
        try coro.io.single(aio.Accept{ .socket = socket, .out_socket = &client_socket });

        const task = try scheduler.spawn(handler, .{ client_socket, thread_id }, .{});
        try tasks.append(task);
        if (!have_tasks.is_set) {
            have_tasks.set();
        }
    }
}

// Is this actually needed? Is there a better way to do this?
// Can tasks clean up after themselves?
fn clean_up_tasks(tasks: *std.ArrayList(HandlerTask), have_tasks: *coro.ResetEvent) !void {
    try have_tasks.wait();
    while (true) {
        if (tasks.items.len == 0) {
            have_tasks.reset();
            try have_tasks.wait();
        }
        var i: usize = 0;
        while (i < tasks.items.len) {
            // Ensure we break for the scheduler to run.
            try coro.io.single(aio.Nop{ .ident = 0 });
            if (tasks.items[i].isComplete()) {
                log.debug("Cleaning up a task\n", .{});
                // This will deinit the function and clean up resources.
                const task = tasks.swapRemove(i);
                task.complete(.wait);
            } else {
                i += 1;
            }
        }
    }
}

const HandlerTask = coro.Task.Generic(void);
fn handler(socket: std.posix.socket_t, thread_id: usize) void {
    log.info("Starting new handler on {}\n", .{thread_id});
    defer log.info("Closing handler\n", .{});

    // I should do a proper check for keepalive here?
    // And http headers in general I guess.
    var buf: [1024]u8 = undefined;
    var len: usize = 0;
    while (true) {
        coro.io.single(aio.Recv{ .socket = socket, .buffer = &buf, .out_read = &len }) catch break;
        log.debug("request:\n{s}\n\n", .{buf[0..len]});

        // This is the fake costly part, sleep for a bit (pretend this is some server web request)
        // Sleep 20ms
        // coro.io.single(aio.Timeout{ .ns = 20 * 1_000 * 1_000 }) catch break;

        const response =
            "HTTP/1.1 200 OK\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: 13\r\n\r\nHello, World!";
        coro.io.single(aio.Send{ .socket = socket, .buffer = response }) catch break;
    }

    coro.io.single(aio.CloseSocket{ .socket = socket }) catch return;
}
Cloudef commented 3 weeks ago

On macos the posix backend is used, which doesn't have that great perf. You'd want to test on linux with io_uring which this library mainly targets. iocp may work okay as well. As for clean_up_tasks, I think it would be good idea to add ability so tasks can be detached so it won't be needed, in fact you may want to replace nop in cleanup_tasks with delay (and move it from the inner while to the outer one) as it seems that loop will be very busy otherwise causing lots of context switches.

Cloudef commented 2 weeks ago

It is now possible to detach tasks.