Open mrjbq7 opened 3 months ago
I am not entirely sure how the write callback works in Zig, to know if the Java version that just attempts a channel.write(bytes)
is doing less work than Zig is doing in the io_uring backend. We would need to follow the implementations deeper to see if its a JVM io_uring thing, or if my PingPong.java is actually less work on Linux.
I believe, but am not entirely sure that Java's using epoll()
on Linux.
If I build with the epoll
backend on Linux:
diff --git a/src/main.zig b/src/main.zig
index 74f451a..1cfb61c 100644
--- a/src/main.zig
+++ b/src/main.zig
@@ -36,7 +36,7 @@ pub const Backend = enum {
/// Returns a recommend default backend from inspecting the system.
pub fn default() Backend {
return @as(?Backend, switch (builtin.os.tag) {
- .linux => .io_uring,
+ .linux => .epoll,
.ios, .macos => .kqueue,
.wasi => .wasi_poll,
.windows => .iocp,
Performance is much slower than the io_uring backend.
$ ./zig-out/bench/ping-pongs
info: 97655.99 roundtrips/s
info: 5.12 seconds total
And it is not the difference between the incremental self.state = (self.state + 1) % (PING.len)
, and the shortcut that I took in Java, because this is only a tiny bit faster:
diff --git a/src/bench/ping-pongs.zig b/src/bench/ping-pongs.zig
index fec07c0..153a92c 100644
--- a/src/bench/ping-pongs.zig
+++ b/src/bench/ping-pongs.zig
@@ -141,33 +141,25 @@ const Client = struct {
l: *xev.Loop,
c: *xev.Completion,
socket: xev.TCP,
- buf: xev.ReadBuffer,
+ _: xev.ReadBuffer,
r: xev.TCP.ReadError!usize,
) xev.CallbackAction {
const self = self_.?;
const n = r catch unreachable;
- const data = buf.slice[0..n];
-
- // Count the number of pings in our message
- var i: usize = 0;
- while (i < n) : (i += 1) {
- assert(data[i] == PING[self.state]);
- self.state = (self.state + 1) % (PING.len);
- if (self.state == 0) {
- self.pongs += 1;
-
- // If we're done then exit
- if (self.pongs > 500_000) {
- socket.shutdown(l, c, Client, self, shutdownCallback);
- return .disarm;
- }
-
- // Send another ping
- const c_ping = self.completion_pool.create() catch unreachable;
- socket.write(l, c_ping, .{ .slice = PING[0..PING.len] }, Client, self, writeCallback);
- }
+ self.state += n;
+ self.pongs += (self.state / PING.len);
+ self.state = (self.state % PING.len);
+
+ // If we're done then exit
+ if (self.pongs > 500_000) {
+ socket.shutdown(l, c, Client, self, shutdownCallback);
+ return .disarm;
}
+ // Send another ping
+ const c_ping = self.completion_pool.create() catch unreachable;
+ socket.write(l, c_ping, .{ .slice = PING[0..PING.len] }, Client, self, writeCallback);
+
// Read again
return .rearm;
}
I also confirmed that it doesn't matter if Java looks at the readBuffer:
for (int i = 0; i < n; i++) {
byte ch = buffer.get();
assert ch == ping.get(state);
state = (state + 1) % ping.limit();
if (state == 0) {
pings += 1;
if (pings > MAX_PINGS) {
System.out.printf("Finished %d pings\n", MAX_PINGS);
long end = System.currentTimeMillis();
double seconds = (double) (end - start) / 1000;
double perSecond = MAX_PINGS / seconds;
System.out.printf("%.2f roundtrips/s\n", perSecond);
System.out.printf("%.2f seconds total", seconds);
System.exit(0);
}
}
}
I modified one of the examples in zig-aio
to do a comparable single-process "ping-pongs" benchmark and it goes as fast as Java on Linux (50% faster than libxev), and uses a io_uring
backend.
info: 254049.52 roundtrips/s
info: 1.97 seconds total
Here's the code for it:
const std = @import("std");
const aio = @import("aio");
const coro = @import("coro");
const log = std.log.scoped(.coro_aio);
pub const aio_options: aio.Options = .{
.debug = false, // set to true to enable debug logs
};
pub const coro_options: coro.Options = .{
.debug = false, // set to true to enable debug logs
};
pub const std_options: std.Options = .{
.log_level = .debug,
};
fn server(startup: *coro.ResetEvent) !void {
var socket: std.posix.socket_t = undefined;
try coro.io.single(aio.Socket{
.domain = std.posix.AF.INET,
.flags = std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC,
.protocol = std.posix.IPPROTO.TCP,
.out_socket = &socket,
});
const address = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, 3131);
try std.posix.setsockopt(socket, std.posix.SOL.SOCKET, std.posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
if (@hasDecl(std.posix.SO, "REUSEPORT")) {
try std.posix.setsockopt(socket, std.posix.SOL.SOCKET, std.posix.SO.REUSEPORT, &std.mem.toBytes(@as(c_int, 1)));
}
try std.posix.bind(socket, &address.any, address.getOsSockLen());
try std.posix.listen(socket, 128);
startup.set();
var client_sock: std.posix.socket_t = undefined;
try coro.io.single(aio.Accept{ .socket = socket, .out_socket = &client_sock });
var buf: [1024]u8 = undefined;
var len: usize = 0;
while (true) {
try coro.io.single(aio.Recv{ .socket = client_sock, .buffer = &buf, .out_read = &len });
try coro.io.single(aio.Send{ .socket = client_sock, .buffer = buf[0..len], .link = .soft });
}
try coro.io.multi(.{
aio.CloseSocket{ .socket = client_sock, .link = .soft },
aio.CloseSocket{ .socket = socket },
});
}
fn client(startup: *coro.ResetEvent) !void {
var socket: std.posix.socket_t = undefined;
try coro.io.single(aio.Socket{
.domain = std.posix.AF.INET,
.flags = std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC,
.protocol = std.posix.IPPROTO.TCP,
.out_socket = &socket,
});
try startup.wait();
const address = std.net.Address.initIp4(.{ 127, 0, 0, 1 }, 3131);
try coro.io.single(aio.Connect{
.socket = socket,
.addr = &address.any,
.addrlen = address.getOsSockLen(),
});
const start_time = try std.time.Instant.now();
var state: usize = 0;
var pongs: u64 = 0;
while (true) {
var buf: [1024]u8 = undefined;
var len: usize = 0;
try coro.io.single(aio.Send{ .socket = socket, .buffer = "PING" });
try coro.io.single(aio.Recv{ .socket = socket, .buffer = &buf, .out_read = &len });
state += len;
pongs += (state / 4);
state = (state % 4);
// If we're done then exit
if (pongs > 500_000) {
break;
}
}
const end_time = try std.time.Instant.now();
const elapsed = @as(f64, @floatFromInt(end_time.since(start_time)));
std.log.info("{d:.2} roundtrips/s", .{@as(f64, @floatFromInt(pongs)) / (elapsed / 1e9)});
std.log.info("{d:.2} seconds total", .{elapsed / 1e9});
try coro.io.single(aio.CloseSocket{ .socket = socket });
}
pub fn main() !void {
// var mem: [4096 * 1024]u8 = undefined;
// var fba = std.heap.FixedBufferAllocator.init(&mem);
var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{};
defer _ = gpa.deinit();
var scheduler = try coro.Scheduler.init(gpa.allocator(), .{});
defer scheduler.deinit();
var startup: coro.ResetEvent = .{};
_ = try scheduler.spawn(client, .{&startup}, .{});
_ = try scheduler.spawn(server, .{&startup}, .{});
try scheduler.run(.wait);
}
It's possible the io_uring
backend is just not optimal, rather than it being related to being in a missing optimization in a single process, it could just be the reduced latency exposes some underlying issue in the backend.
I believe the libxev caches the current time each loop, so I added that to the Java version, but that only reduced its performance by 1-2%.
I was beginning to use
libxev
in Zig, and looked at thesrc/bench/ping-pongs.zig
benchmark, and was comparing the performance results with Java. I found a possibly missing performance optimization, if the reader and writer are in the same process, on Linux.On Linux (12th Gen Intel Core i7-1260P), the Java version is 50% faster.
On macOS (MacBookPro, M2 Max, 64 GB), the results are roughly the same between Zig and Java.
Note: I also split apart the
ping-pongs
binary into aping
and apong
and implemented in both in Java and Zig, and the performance on both Linux and macOS are the same when running in different processes. Java is not faster, so it makes me think there might be a missing optimization when both read and write occur in the same process in theio_uring
backend.Here's the
PingPong.java
: