InKryption / rpmalloc-zig-port

MIT License
41 stars 4 forks source link

Cross-thread frees cause segfaults #6

Open joadnacer opened 7 months ago

joadnacer commented 7 months ago

The following code takes n - number of threads as command line arg - if omitted defaults to 1

It then instantiates n allocating threads and n freeing threads, and transfers allocations from allocating threads to freeing threads via an mpmc queue, running infinitely in a while true loop

Running this using rpmalloc-zig causes a segfault very fast even for n=1

if you set use_rpmalloc=false, the c_allocator will be used and this is able to run infinitely for any number of threads without segfaulting

const std = @import("std");
const rp = @import("rpmalloc.zig");

const testing = std.testing;
const assert = std.debug.assert;
const Value = std.atomic.Value;

const queue_size = 1024;
const Queue = BoundedMpmcQueue([]u8, queue_size);

const alloc_size = 8;

// no segfault if false
const use_rpmalloc = true;

pub fn main() !void {
    const args = try std.process.argsAlloc(std.heap.page_allocator);
    defer std.process.argsFree(std.heap.page_allocator, args);

    const num_args = args.len - 1;

    if (num_args == 0) return try crossthread(1);

    const num_threads = try std.fmt.parseInt(u32, args[1], 10);

    try crossthread(num_threads);
}

pub fn crossthread(num_threads: u32) !void {
    const rpmalloc = rp.RPMalloc(.{});
    try rpmalloc.init(null, .{});

    const allocator = if (use_rpmalloc) rpmalloc.allocator() else std.heap.c_allocator;

    var queue = Queue.init();

    var workers: []std.Thread = try std.heap.page_allocator.alloc(std.Thread, num_threads * 2);
    defer std.heap.page_allocator.free(workers);

    for (0..num_threads) |i| {
        workers[i] = try std.Thread.spawn(.{}, threadAllocWorker, .{ allocator, &queue });
        workers[i * 2] = try std.Thread.spawn(.{}, threadFreeWorker, .{ allocator, &queue });
    }

    for (0..num_threads) |i| {
        workers[i].join();
    }
}

pub fn threadAllocWorker(allocator: std.mem.Allocator, queue: *Queue) !void {
    if (use_rpmalloc) try rp.RPMalloc(.{}).initThread();

    while (true) {
        const alloc = try allocator.alloc(u8, alloc_size);

        while (!queue.tryWrite(alloc)) {}
    }
}

pub fn threadFreeWorker(allocator: std.mem.Allocator, queue: *Queue) !void {
    while (true) {
        const alloc = queue.tryRead() orelse continue;

        allocator.free(alloc);
    }
}

/// Array based bounded multiple producer multiple consumer queue
/// This is a Zig port of Dmitry Vyukov's https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
pub fn BoundedMpmcQueue(comptime T: type, comptime buffer_size: usize) type {
    assert(buffer_size & (buffer_size - 1) == 0); // must be power of 2

    const buffer_mask = buffer_size - 1;

    const Cell = struct {
        sequence: Value(usize),
        data: T,
    };

    return struct {
        enqueue_pos: Value(usize) align(std.atomic.cache_line),
        dequeue_pos: Value(usize) align(std.atomic.cache_line),
        buffer: [buffer_size]Cell,

        const Self = @This();

        pub fn init() BoundedMpmcQueue(T, buffer_size) {
            var buf: [buffer_size]Cell = undefined;

            @setEvalBranchQuota(queue_size);
            for (&buf, 0..) |*cell, i| {
                cell.sequence = Value(usize).init(i);
            }

            return .{
                .enqueue_pos = Value(usize).init(0),
                .dequeue_pos = Value(usize).init(0),
                .buffer = buf,
            };
        }

        /// Attempts to write to the queue, without overwriting any data
        /// Returns `true` if the data is written, `false` if the queue was full
        pub fn tryWrite(self: *Self, data: T) bool {
            var pos = self.enqueue_pos.load(.Monotonic);

            var cell: *Cell = undefined;

            while (true) {
                cell = &self.buffer[pos & buffer_mask];
                const seq = cell.sequence.load(.Acquire);
                const diff = @as(i128, seq) - @as(i128, pos);

                if (diff == 0 and self.enqueue_pos.cmpxchgWeak(pos, pos + 1, .Monotonic, .Monotonic) == null) {
                    break;
                } else if (diff < 0) {
                    return false;
                } else {
                    pos = self.enqueue_pos.load(.Monotonic);
                }
            }

            cell.data = data;
            cell.sequence.store(pos + 1, .Release);

            return true;
        }

        /// Attempts to read and remove the head element of the queue
        /// Returns `null` if there was no element to read
        pub fn tryRead(self: *Self) ?T {
            var cell: *Cell = undefined;
            var pos = self.dequeue_pos.load(.Monotonic);

            while (true) {
                cell = &self.buffer[pos & buffer_mask];
                const seq = cell.sequence.load(.Acquire);
                const diff = @as(i128, seq) - @as(i128, (pos + 1));

                if (diff == 0 and self.dequeue_pos.cmpxchgWeak(pos, pos + 1, .Monotonic, .Monotonic) == null) {
                    break;
                } else if (diff < 0) {
                    return null;
                } else {
                    pos = self.dequeue_pos.load(.Monotonic);
                }
            }

            const res = cell.data;
            cell.sequence.store(pos + buffer_mask + 1, .Release);

            return res;
        }
    };
}