rsepassi / zigcoro

A Zig coroutine library
BSD Zero Clause License
85 stars 6 forks source link

chanel for inter-thread communcation/sync #22

Closed Pismice closed 1 month ago

Pismice commented 1 month ago

Hello,

I have tried using chanels in order to communicate between threads like in Go.

But it does not seem possible, because I want to xawait for the frame of the xasync that sends through the chanel.

And even after that I would want chanels to be active all the time so that threads can communiate through during their entire lifespan, but the problem is that after the exec.tick loop I can not do works with chanels anymore.

Am I doing something wrong or it is just possible to use your chanels this way ?

Here is the code where I tried to do something like that with just 1 thread trying to communicate back with the main thread:

const std = @import("std");
const libcoro = @import("libcoro");

const BurgerOrder = struct {
    burger: u8,
    fries: u8,
};

pub fn main() !void {
    const allocator = std.heap.page_allocator;
    var exec = libcoro.Executor.init();
    libcoro.initEnv(.{ .stack_allocator = allocator, .executor = &exec });

    // Creation of a Type that represents a channel that can passe floats
    const BurgeOrderChanel = libcoro.Channel(BurgerOrder, .{});

    // Creation of a channel that can pass Burger Orders
    var road_between_restaurant_and_house = BurgeOrderChanel.init(null);

    const delivery_man_occupations = try std.Thread.spawn(.{}, deliver, .{&road_between_restaurant_and_house});
    //const hungry_client_occupations = try std.Thread.spawn(.{}, goTo, .{ 2, 3 });

    const hungry_client = try libcoro.xasync(recvr, .{&road_between_restaurant_and_house}, null);
    defer hungry_client.deinit();

    while (exec.tick()) {
        // While there are deliveries to do, they will be made
        // After that point, the delivery man does not take any more orders
    }

    // ! FIXME undefined delivery_man because the stack is now declared inside the thread
    libcoro.xawait(delivery_man); // Delivery man finished his job
    const order = libcoro.xawait(hungry_client); // Hungry client received his order
    std.debug.print("Burger = {} | Fries = {}", .{ order.burger, order.fries });
}

fn deliver(road: *libcoro.Channel) void {
    // do some work ...
    // prepare his bike ...
    const delivery_man = try libcoro.xasync(sender, .{ road, BurgerOrder{ .burger = 2, .fries = 3 } }, null);
    defer delivery_man.deinit();
}

fn sender(chan: anytype, order: BurgerOrder) void {
    defer chan.close();
    chan.send(order) catch unreachable;
}

fn recvr(chan: anytype) BurgerOrder {
    return chan.recv() orelse BurgerOrder{ .burger = 0, .fries = 0 }; // The delivery might fail to arrive
}
rsepassi commented 1 month ago

Channels in the library are for communication between coroutines on the same thread, not across threads. They are inter-coroutine communication mechanisms, not inter-thread.

If you want a multithreaded runtime, I would suggest creating N threads up front, each with their own event loops, and then have structured communication between them. But it’s outside the scope of this library. You may want to look at some of the queues within libxev if you don’t want to write a queue/ringbuffer yourself.

On Fri, May 24, 2024 at 4:05 AM Pismice @.***> wrote:

Hello,

I have tried using chanels in order to communicate between threads like in Go.

But it does not seem possible, because I want to xawait for the frame of the xasync that sends through the chanel.

And even after that I would want chanels to be active all the time so that threads can communiate through during their entire lifespan, but the problem is that after the exec.tick loop I can not do works with chanels anymore.

Am I doing something wrong or it is just possible to use your chanels this way ?

Here is the code where I tried to do something like that with just 1 thread trying to communicate back with the main thread:

const std = @import("std"); const libcoro = @import("libcoro");

const BurgerOrder = struct { burger: u8, fries: u8, };

pub fn main() !void { const allocator = std.heap.page_allocator; var exec = libcoro.Executor.init(); libcoro.initEnv(.{ .stack_allocator = allocator, .executor = &exec });

// Creation of a Type that represents a channel that can passe floats
const BurgeOrderChanel = libcoro.Channel(BurgerOrder, .{});

// Creation of a channel that can pass Burger Orders
var road_between_restaurant_and_house = BurgeOrderChanel.init(null);

const delivery_man_occupations = try std.Thread.spawn(.{}, deliver, .{&road_between_restaurant_and_house});
//const hungry_client_occupations = try std.Thread.spawn(.{}, goTo, .{ 2, 3 });

const hungry_client = try libcoro.xasync(recvr, .{&road_between_restaurant_and_house}, null);
defer hungry_client.deinit();

while (exec.tick()) {
    // While there are deliveries to do, they will be made
    // After that point, the delivery man does not take any more orders
}

// ! FIXME undefined delivery_man because the stack is now declared inside the thread
libcoro.xawait(delivery_man); // Delivery man finished his job
const order = libcoro.xawait(hungry_client); // Hungry client received his order
std.debug.print("Burger = {} | Fries = {}", .{ order.burger, order.fries });

}

fn deliver(road: *libcoro.Channel) void { // do some work ... // prepare his bike ... const delivery_man = try libcoro.xasync(sender, .{ road, BurgerOrder{ .burger = 2, .fries = 3 } }, null); defer delivery_man.deinit(); }

fn sender(chan: anytype, order: BurgerOrder) void { defer chan.close(); chan.send(order) catch unreachable; }

fn recvr(chan: anytype) BurgerOrder { return chan.recv() orelse BurgerOrder{ .burger = 0, .fries = 0 }; // The delivery might fail to arrive }

— Reply to this email directly, view it on GitHub https://github.com/rsepassi/zigcoro/issues/22, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAIQMWZCADOIJNAH2KW4N73ZD4NHXAVCNFSM6AAAAABIHLTCMCVHI2DSMVQWIX3LMV43ASLTON2WKOZSGMYTKMJSGA4DAMA . You are receiving this because you are subscribed to this thread.Message ID: @.***>

Pismice commented 1 month ago

Thank you :)