ziglang / zig

General-purpose programming language and toolchain for maintaining robust, optimal, and reusable software.
https://ziglang.org
MIT License
35.02k stars 2.56k forks source link

ability to write code that is agnostic of blocking vs async I/O #1778

Closed andrewrk closed 5 years ago

andrewrk commented 5 years ago

This is a pretty popular blog post from 2015 that helps illustrate one way of thinking about concurrency and async I/O: What Color is Your Function?

Here's a proposal along these lines. I think we should at least consider it.

Depends on:

Goals:

In summary, writing Zig code should be maximally portable. That means proper Zig libraries could work in constrained memory environments, multi-threaded environments, single-threaded environments, blocking I/O applications, evented I/O applications, inside OS kernels, and userland applications. And not only work correctly in these environments, but work optimally.

Implementation:

Standard library functions that perform I/O, such as std.os.File.write, have bodies that look like this:

pub fn write(file: *File, bytes: []const u8) usize {
    // Note that this is an if with comptime-known condition.
    if (std.event.loop.instance) |event_loop| {
        const msg = std.event.fs.Msg{
            .Write = std.event.fs.Msg.Write{
                .handle = file.handle,
                .ptr = bytes.ptr,
                .len = bytes.len,
                .result = undefined,
            },
        };
        suspend {
            event_loop.queueFsWrite(@handle(), &msg);
        }
        return msg.Write.result;
    } else {
        // blocking call
        return std.os.linux.write(file.handle, bytes.ptr, bytes.len);
    }
}

In std/event/loop.zig:

threadlocal var per_thread_instance: ?*Loop = null;
var global_state: Loop = undefined;
const io_mode = @fieldOrDefault(@import("root"), "io_mode", IoMode.Blocking);
const default_instance: ?*Loop = switch (io_mode) {
    .Blocking => null,
    .Evented => &global_state,
    .Mixed => per_thread_instance,
};
const instance: ?*Loop = @fieldOrDefault(@import("root", "event_loop", default_instance);

In the root source file, pub const io_mode determines whether the application is 100% blocking, 100% evented, or mixed (per-thread). If nothing is specified then the application is 100% blocking.

Or an application can take even more control, and set the event loop instance directly. This would potentially be used for OS kernels, which need an event loop specific to their own code.

When the IO method is Mixed, in the standard library event loop implementation, worker threads get a thread local variable per_thread_instance set to the event loop instance pointer. The "main thread" which calls run also sets this thread local variable to the event loop instance pointer. In this way, sections of the codebase can be isolated from each other; threads which are not in the thread pool of the event loop get blocking I/O (or potentially belong to a different event loop) while threads belonging to a given event loop's thread pool, find their owner instance.

Now let's look at some everyday code that wants to call write:

fn foo() void {
    const rc = file.write("hello\n");
}

// assume this is in root source file
pub const io_mode = .Evented;

Because io_mode is Evented the write function ends up calling suspend and is therefore a coroutine. And following this, foo is also therefore a coroutine, since it calls write.

When a coroutine calls a coroutine in this manner, it does a tail-async-await coroutine call using its own stack.

But what if the code wants to express possible concurrency?

fn foo() void {
    var future1 = async file1.write("hello1\n");
    var future2 = async file2.write("hello2\n");
    const rc1 = await future1;
    const rc2 = await future2;
}

async is a keyword that takes any expression, which could be a block, but in this case is a function call. The async expression itself returns a coroutine frame type, which supports await. The inner expression result is stored in the frame, and is the result when using await. I'll elaborate more on async blocks later.

If an application is compiled with IoMode.Blocking, then the write function is blocking. How async interacts with an expression that is all blocking, is to have the result of the async expression be the result of the inner expression. So then the type of future1 and future2 remain the same as before for consistency, but in the code generated, they are just the result values, and then the await expressions are no-ops. The function is essentially rewritten as:

fn foo() void {
    const rc1 = file1.write("hello1\n");
    const rc2 = file2.write("hello2\n");
}

Which makes foo blocking as well.

What about a CPU bound task?

fn areTheyEqual() bool {
    var pi_frame = async blk: {
        std.event.loop.startCpuTask();
        break :blk calculatePi();
    };
    var e_frame = async blk: {
        std.event.loop.startCpuTask();
        break :blk calculateE();
    };
    const pi = await pi_frame;
    const e = await e_frame;
    return pi == e;
}

Here, startCpuTask is defined as:

fn startCpuTask() void {
    if (@import("builtin").is_single_threaded) {
        return;
    } else if (std.event.loop.instance) |event_loop| {
        suspend {
            event_loop.onNextTick(@handle());
        }
    }
}

So, if you build this function in multi-threaded mode, with io_mode != IoMode.Blocking, the async expression suspends in the startCpuTask and then gets resumed by the event loop on another thread. areTheyEqual becomes a coroutine. So even though calculatePi and calculateE are blocking functions, they end up executing in different threads.

If you build this application in --single-threaded mode, startCpuTask ends up being return;. It is thus not a coroutine. And so the async expressions in areTheyEqual have only blocking calls, which means they turn into normal expressions, and after the function is analyzed, it looks like this:

fn areTheyEqual() bool {
    const pi = calculatePi();
    const e = calculateE();
    return pi == e;
}
daurnimator commented 5 years ago

Ability to write code that works in an event-driven or blocking context. For example, libraries should be able to use OS features such as the file system and networking without having an opinion about whether to use blocking or event-based APIs.

This is a great goal. Being able to write a library that e.g. talks to redis, without caring if it's calling async or sync functions is great: it means that the library becomes a non-blocking redis library if run from the right place!

In an application that never creates an event loop, there should be no code generated to deal with event loops.

I think the answer here depends: some code will need an event loop. However this can be made transparent: the caller doesn't necessarily need to know. One example I had recently was DNS lookups: you want to do both A and AAAA lookups, and would like to parallelize them:

fn resolve(host: []u8) []dnsResults {
    var r = dnsResults.init();
    var t1 = newthread { r.lookupA(host) };
    var t2 = newthread { r.lookupAAAA(host) };
    waitFor(t1, t2);
    return r;
}

This example has no need to yield back to the main loop: the event loop can happen directly inside of this function. However in an existing main loop, waiting on the records should be done at the top level.

In an application that never uses blocking I/O, there should be no runtime overhead associated with detecting the event loop, and no code generated at all to do blocking I/O.

If possible this would be good. Though maybe not possible?

Code can even express concurrency, such as two independent writes, and then wait for them both to be done, and then if this code is used from a blocking I/O application, with --single-threaded, it is as if it were implemented fully in a blocking fashion.

I covered this above.


Language Level

I think you just need the following functions on coroutines at a language level.

Standard Library

I'm finding it difficult to express what I'm suggesting. Hopefully this sample pseudo-zig-code makes sense:

const JobList = std.LinkedList(void);
struct Job {
  node: JobList.Node, // intrusive linked list
  co: Coroutine,
  waitingOn: []WaitArg,
  readyWaits: ArrayList(WaitArg),
  onComplete: ?fn (*Job) void,

  pub fn init(f: fn(void) void, onComplete: ?fn (*Job) void) Job {
    return Job{
      .node = JobList.Node.init(undefined),
      .co = coroutine.create(self.allocator, @stackRequired(f), startJob), // something here to pass f
      .waitingOn = null,
      .onComplete = onComplete,
    }
  }
}

threadlocal var currentLoop: ?&eventLoop = null;
threadlocal var currentJob: ?&Job = null;

struct eventLoop {
  allocator: *Allocator,
  pendingJobs: JobList,

  pub fn empty(self:*eventLoop) bool {
    return pendingJobs.len == 0;
  }

  pub fn step(self: *eventLoop) !void {
    var ready: []waitArg = undefined;
    if (currentLoop != null) {
      // Let parent event loop do the waiting
      ready = wait([]WaitArg{ { .EventLoop = &self } });
    } else {
      // There is no event loop above us, do the OS-level polling here
      ready = someOsPoll(self.miscPrivateFields); // e.g. epoll, poll, select... whatever platform API is best
    }

    for (ready) |wait| {
      for (findJobsThatWant(wait)) |job| {
        job.readyWaits.add(wait); // on failure could just ignore, if not using edge-triggered events it will trigger again on next step() call
        try self.pendingJobs.append(job);
      }
    }

    var oldLoop = currentLoop;
    currentLoop = self;
    for (self.pendingJobs) |job| {
      var oldJob = currentJob;
      currentJob = &job;
      job.waitingOn = job.co.resume(job.readyWaits.toSlice());
      currentJob = oldJob;
      job.readyWaits.shrink(0);
      if (job.co.complete) {
        self.pendingJobs.remove(job);
        job.onComplete();
        continue;
      }
      // update miscPrivateFields based on job.waitingOn
    }
    currentLoop = oldLoop;
  }

  pub fn loop(self: *eventLoop) !void {
    while (!self.empty()) {
      try self.step();
    }
  }

  pub fn addJob(self: *eventLoop, job: *Job) void {
    self.pendingJobs.append(job);
  }

  pub fn newJob(self: *eventLoop, f: fn(void) void) !*Job {
    var job = try self.allocator.create(Job.init(f, allocator.destroy));
    addJob(self, job);
    return job;
  }
}

const WaitType = enum { Timeout, PollFD, EventLoop }; // + any other OS primitives to wait on
const WaitArg = union(WaitType) {
  Timeout: f64,
  PollFD: struct {
    fd: i32,
    events: i32, // mask of POLLIN|POLLOUT|POLLPRI and possibly other poll() flags
  },
  EventLoop: *eventLoop, // We build in support so that event loops are themselves wait-able. This allows multiple layers of loops to be composable.
};
pub fn wait(arg: []WaitArg) ![]WaitArg {
  if (currentJob.co == coroutine.running()) { // if we are inside of a managed coroutine then let the loop do the work
    return @yield(arg);
  } else {
    // otherwise create a new single-use loop. could have a global one prepared.
    var loop = eventLoop.init(); // stack allocated
    defer loop.destroy();
    var job = Job.init(wait); // stack allocated job
    loop.addJob(&job);
    try loop.loop();
    // filter arg based on job.readyWaits
    return arg;
  }
}

Then the current std.os.posixWrite could be written as (changed the EAGAIN branch, previously was unreachable):

pub fn posixWrite(socket: *Socket, bytes: []const u8) usize {
    const max_bytes_len = 0x7ffff000;

    var index: usize = 0;
    while (index < bytes.len) {
        const amt_to_write = math.min(bytes.len - index, usize(max_bytes_len));
        const rc = posix.write(fd, bytes.ptr + index, amt_to_write);
        const write_err = posix.getErrno(rc);
        switch (write_err) {
            0 => {
                index += rc;
                continue;
            },
            posix.EINTR => continue,
            posix.EAGAIN => {
              var _ = try wait([]WaitArg{ { .PollFD = { .fd = fd, .events=POLLOUT } }});
              continue;
            }
            posix.EINVAL => unreachable,
            posix.EFAULT => unreachable,
            posix.EBADF => unreachable, // always a race condition
            posix.EDESTADDRREQ => unreachable, // connect was never called
            posix.EDQUOT => return PosixWriteError.DiskQuota,
            posix.EFBIG => return PosixWriteError.FileTooBig,
            posix.EIO => return PosixWriteError.InputOutput,
            posix.ENOSPC => return PosixWriteError.NoSpaceLeft,
            posix.EPERM => return PosixWriteError.AccessDenied,
            posix.EPIPE => return PosixWriteError.BrokenPipe,
            else => return unexpectedErrorPosix(write_err),
        }
    }
}
andrewrk commented 5 years ago

I don't think this code can accomplish the stated goals:

fn resolve(host: []u8) []dnsResults {
    var r = dnsResults.init();
    var t1 = newthread { r.lookupA(host) };
    var t2 = newthread { r.lookupAAAA(host) };
    waitFor(t1, t2);
    return r;
}

I'd need to understand what newthread and waitFor are doing. It looks like they allocate resources, and have the potential for failure, yet the parameters accept no allocator and the return type indicates no error.

If this code were to be used by a target which had no event loop abilities, this function needs to be purely blocking, which means that it does the lookups serially, and there should be no possibility of OutOfMemory error. Zig code cannot use an event loop without one being set up, in userland. There is no event loop runtime set by the language.

In an application that never uses blocking I/O, there should be no runtime overhead associated with detecting the event loop, and no code generated at all to do blocking I/O.

If possible this would be good. Though maybe not possible?

I just outlined how this is possible. Do you see a flaw that would be prevent me from implementing it?


It looks like you're advocating for stackful coroutines, while my proposal is built on the premise of stackless coroutines.

I have a strong stance against stackful coroutines. This kind of concurrency is not really better than creating threads, from a performance and scheduling perspective. One may as well use the OS for what it was designed for. But my main issue with it is that it makes parallelism too intentional. We're back to the threading model for concurrency rather than a more powerful abstraction. Stackless coroutines are also known as "continuation passing style". This allows us to have coroutines without heap allocation, or at least with limited heap allocation. In Zig we don't get to allocate memory for free; it comes at the cost of an extra allocator parameter, and a possible OutOfMemory error, neither of which are typically required for blocking calls.

What I have outlined here is a way that will work in Zig, and it even allows expressing "optional concurrency". It's not clear to me what problems you are pointing out in my proposal, or what you are trying to solve with your pseudocode.

I think in order to be convincing here, you'd have to show me a use case of userland code you would want to write, but using my proposal find yourself unable to express it, however with your counter-proposal the use case would be rectified.

daurnimator commented 5 years ago

I'd need to understand what newthread and waitFor are doing. It looks like they allocate resources, and have the potential for failure, yet the parameters accept no allocator and the return type indicates no error.

If this code were to be used by a target which had no event loop abilities, this function needs to be purely blocking, which means that it does the lookups serially, and there should be no possibility of OutOfMemory error. Zig code cannot use an event loop without one being set up, in userland. There is no event loop runtime set by the language.

I've rewritten that example to use my earlier imaginary library code:

fn resolve(host: []u8) []dnsResults {
    var r = dnsResults.init();

    // create a stack allocated loop
    var loop = eventLoop.init();
    defer loop.destroy();

    // create a stack allocated Jobs
    // need to figure out style for passing a callback + argument(s)
    // for now I'm pretending we have closures with some sort of block syntax.
    var j1 = Job.init({ r.lookupA(host) }, null);
    var j2 = Job.init({ r.lookupAAAA(host) }, null);

    // Add our new jobs to the loop
    loop.addJob(&j1);
    loop.addJob(&j2);

    // run loop until no jobs left.
    // as our loop only has 2 jobs, this is equivalent to waiting for our two jobs
    // 
    // it could throw an error if e.g. epoll_wait() syscall fails.
    // I also would propagate any errors in a job to here.
    // i.e. if a Job returns an error, it should be returned from .loop
    // preferably along with a pointer to the Job that errored.
    try loop.loop();

    return r;
}

I have a strong stance against stackful coroutines. This kind of concurrency is not really better than creating threads, from a performance and scheduling perspective.

I disagree. It's been widely shown that OS threads are too expensive to have e.g. one per HTTP request.

But my main issue with it is that it makes parallelism too intentional.

From the zen of zig: "Communicate intent precisely."

However with my proposal, the user only needs to know about the abstraction when they wish to have things happening in parallel. The rest of the code may remain oblivious.

daurnimator commented 5 years ago

What I have outlined here is a way that will work in Zig, and it even allows expressing "optional concurrency". It's not clear to me what problems you are pointing out in my proposal, or what you are trying to solve with your pseudocode.

I'm slowly coming around to your proposal.

I'm wondering how it would work for interop with non-zig code:

The global io_mode seems a bit weird to me. Especially as zig is often used to create libraries called from other lanaguages. Could it be a per-function flag?

roobie commented 5 years ago

I think using a per-function flag defeats the mitigation of the red/blue color discrepancy of functions. If I understand correctly, the io_mode would be relevant mostly for exes? Since libraries would either:

hcff commented 5 years ago

Maybe I'm wrong, but I don't think hiding the asynchronicity of the function is in line with Zig's goals. Knowing if an operation is async or not is important, because if the event loop is single-threaded, a blocking operation means that nothing else can happen during that operation.

For example, imagine that you're working on an async web application written in Zig, and you're looking at the account creation part.

fn handle_account_creation(login: []const u8, password: []const u8) !void {
    if (try sql("SELECT count(*) FROM users WHERE login = :?", login) != 0) {
        return error.UserAlreadyExist;
    }
    try sql("INSERT INTO users VALUES (:?, ...)", login, ...);
}

If the sql function is async, then this (pseudo-)code has a potential race condition if two account creation requests with the same login happen at the same time. In that case, the check if the user already exist may pass for both requests, and then you end up with two users with the same login. If asynchronicity is implicit, then it's hard to know if sql will actually suspend: You have to check its implementation, and if it depends on io_mode, then you have to check that value too. If it's explicit, then the await is a clear indication that some other code may get run while the function is suspended.

andrewrk commented 5 years ago

Thanks for the specific example code.

If the sql function is async, then this (pseudo-)code has a potential race condition if two account creation requests with the same login happen at the same time.

Isn't this the same problem is the code is blocking? If you do a thread per request, then handle_account_creation could be called from more than one thread, so you have this race condition either way.

From the perspective of the handle_account_creation, the sql operations are equivalent to using a global variable for a temporary variable, since the sql database is global to multiple calls to handle_account_creation. Although technically they would be OK in a single-threaded, blocking environment, functions which use global state as temporary values should generally be avoided, and when used, should be carefully documented under what conditions it is appropriate to call them.

More bluntly, I would reject this as a valid use case because the code is inherently flawed, and thus I don't think it should really be considered a guiding example for this proposal.

roobie commented 5 years ago

Most systems that has to manage external state (SQL, KV-stores etc.) that is distributed in some fashion has this problem, irrespective of if it's single threaded/blocking calls or full async, since it's possible to run more than one instance that is talking to the data APIs. Handling the case where one user might send two requests for being created in the database should be done in the database with e.g. UNIQUE indices and stuff like that, because that's one of the responsibilities of the RDBMS.

An interesting example with regards to this discussion is full async coroutines in Lua that are used in a fully procedural manner.

roobie commented 5 years ago

One thing I am uncertain of with regards to the Lua example, is that it is it not obvious how or if it is possible to run the async functions in parallel. I mean, in the example, the two database queries are sequentially executed, whereas, I can think of situations I would like to execute them in parallel, so that they both wait on I/O simultaneously.

Doing a bit of digging in the pgmoon library, it does support parallel execution (from the README):

This method also supports sending multiple queries at once by separating them with a ;. The number of queries executed is returned as a second return value after the result object. When more than one query is executed then the result object changes slightly. It becomes a array table holding all the individual results:

local res, num_queries = pg:query([[
  select id, name from users;
  select id, title from posts
]])
daurnimator commented 5 years ago

One thing I am uncertain of with regards to the Lua example, is that it is it not obvious how or if it is possible to run the async functions in parallel. I mean, in the example, the two database queries are sequentially executed, whereas, I can think of situations I would like to execute them in parallel, so that they both wait on I/O simultaneously.

In openresty (which is where pgmoon runs), you create a new stackful coroutine with e.g. ngx.timer.at(0, function() ...... end)

Doing a bit of digging in the pgmoon library, it does support parallel execution

I think this is a postgres feature rather than a pgmoon one.

galvez commented 5 years ago

Really like the proposal @andrewrk -- am new to Zig but falling in love very quickly :)

andrewrk commented 5 years ago

Done with the merge of #3033. Opening new issues for the remaining details to be ironed out.