littledan / linux-aio

How to use the Linux AIO feature
324 stars 42 forks source link

Note, Linux AIO is now subsumed by the io_uring API (tutorial, LWN coverage). The below explanation is mostly useful for old kernels.

Introduction

The Asynchronous Input/Output (AIO) interface allows many I/O requests to be submitted in parallel without the overhead of a thread per request. The purpose of this document is to explain how to use the Linux AIO interface, namely the function family io_setup, io_submit, io_getevents, io_destroy. Currently, the AIO interface is best for O_DIRECT access to a raw block device like a disk, flash drive or storage array.

What is AIO?

Input and output functions involve a device, like a disk or flash drive, which works much slower than the CPU. Consequently, the CPU can be doing other things while waiting for an operation on the device to complete. There are multiple ways to handle this:

Asynchronous I/O can be considered “lower level” than synchronous I/O because it does not make use of a system-provided concept of threads to organize its computation. However, it is often more efficient to use AIO than synchronous I/O due the nondeterministic overhead of threads.

The Linux AIO model

The Linux AIO model is used as follows:

  1. Open an I/O context to submit and reap I/O requests from.
  2. Create one or more request objects and set them up to represent the desired operation
  3. Submit these requests to the I/O context, which will send them down to the device driver to process on the device
  4. Reap completions from the I/O context in the form of event completion objects,
  5. Return to step 2 as needed.

I/O context

io_context_t is a pointer-sized opaque datatype that represents an “AIO context”. It can be safely passed around by value. Requests in the form of a struct iocb are submitted to an io_context_t and completions are read from the io_context_t. Internally, this structure contains a queue of completed requests. The length of the queue forms an upper bound on the number of concurrent requests which may be submitted to the io_context_t.

To create a new io_context_t, use the function

int io_setup(int maxevents, io_context_t *ctxp);

Here, ctxp is the output and maxevents is the input. The function creates an io_context_t with an internal queue of length maxevents. To deallocate an io_context_t, use

int io_destroy(io_context_t ctx);

There is a system-wide maximum number of allocated io_context_t objects, set at 65536.

An io_context_t object can be shared between threads, both for submission and completion. No guarantees are provided about ordering of submission and completion with respect to interaction from multiple threads. There may be performance implications from sharing io_context_t objects between threads.

Submitting requests

struct iocb represents a single request for a read or write operation. The following struct shows a simplification on the struct definition; a full definition is found in <libaio.h> within the libaio source code.

struct iocb {
    void *data;
    short aio_lio_opcode;
    int aio_fildes;

    union {
        struct {
            void *buf;
            unsigned long nbytes;
            long long offset;
        } c;
    } u;
};

The meaning of the fields is as follows: data is a pointer to a user-defined object used to represent the operation

The convenience functions io_prep_pread and io_prep_pwrite can be used to initialize a struct iocb. New operations are sent to the device with io_submit.

int io_submit(io_context_t ctx, long nr, struct iocb *ios[]);

io_submit allows an array of pointers to struct iocbs to be submitted all at once. In this function call, nr is the length of the ios array. If multiple operations are sent in one array, then no ordering guarantees are given between the iocbs. Submitting in larger batches sometimes results in a performance improvement due to a reduction in CPU usage. A performance improvement also sometimes results from keeping many I/Os ‘in flight’ simultaneously.

If the submission includes too many iocbs such that the internal queue of the io_context_t would overfill on completion, then io_submit will return a non-zero number and set errno to EAGAIN.

When used under the right conditions, io_submit should not block. However, when used in certain ways, it may block, undermining the purpose of asynchronous I/O. If this is a problem for your application, be sure to use the O_DIRECT flag when opening a file, and operate on a raw block device. Work is ongoing to fix the problem.

Processing results

Completions read from an io_context_t are of the type struct io_event, which contains the following relevant fields.

struct io_event {
    void *data;
    struct iocb *obj;
    long long res;
};

Here, data is the same data pointer that was passed in with the struct iocb, and obj is the original struct iocb. res is the return value of the read or write.

Completions are reaped with io_getevents.

int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);

This function has a good number of parameters, so an explanation is in order:

The return value represents how many completions were reported, i.e. how much of events was written. The return value will be between 0 and nr. The return value may be lower than min_nr if the timeout expires; if the timeout is NULL, then the return value will be between min_nr and nr.

The parameters give a broad range of flexibility in how AIO can be used.

Even if min_nr = 0 or 1, it is useful to make nr a bit bigger for performance reasons: more than one event may be already complete, and it could be processed without multiple calls to io_getevents. The only cost of a larger nr value library is that the user must allocate a larger array of events and be prepared to accept them.

Use with epoll

Any iocb can be set to notify an eventfd on completion using the libaio function io_set_eventfd. The eventfd can be put in an epoll object. When the eventfd is triggered, then the io_getevents function can be called on the corresponding io_context_t.

There is no way to use this API to trigger an eventfd only when multiple operations are complete--the eventfd will always be triggered on the first operation. Consequently, as described in the previous section, it will often make sense to use min_nr = 1 when using io_getevents after an epoll_wait call that indicates an eventfd involved in AIO.

Performance considerations

Alternatives to Linux AIO

Sample code

Below is some example code which uses Linux AIO. I wrote it at Google, so it uses the Google glog logging library and the Google gflags command-line flags library, as well as a loose interpretation of Google’s C++ coding conventions. When compiling it with gcc, pass -laio to dynamically link with libaio. (It isn’t included in glibc, so it must be explicitly included.)

// Code written by Daniel Ehrenberg, released into the public domain

#include <fcntl.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <libaio.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>

DEFINE_string(path, "/tmp/testfile", "Path to the file to manipulate");
DEFINE_int32(file_size, 1000, "Length of file in 4k blocks");
DEFINE_int32(concurrent_requests, 100, "Number of concurrent requests");
DEFINE_int32(min_nr, 1, "min_nr");
DEFINE_int32(max_nr, 1, "max_nr");

// The size of operation that will occur on the device
static const int kPageSize = 4096;

class AIORequest {
 public:
  int* buffer_;

  virtual void Complete(int res) = 0;

  AIORequest() {
    int ret = posix_memalign(reinterpret_cast<void**>(&buffer_),
                             kPageSize, kPageSize);
    CHECK_EQ(ret, 0);
  }

  virtual ~AIORequest() {
    free(buffer_);
  }
};

class Adder {
 public:
  virtual void Add(int amount) = 0;

  virtual ~Adder() { };
};

class AIOReadRequest : public AIORequest {
 private:
  Adder* adder_;

 public:
  AIOReadRequest(Adder* adder) : AIORequest(), adder_(adder) { }

  virtual void Complete(int res) {
    CHECK_EQ(res, kPageSize) << "Read incomplete or error " << res;
    int value = buffer_[0];
    LOG(INFO) << "Read of " << value << " completed";
    adder_->Add(value);
  }
};

class AIOWriteRequest : public AIORequest {
 private:
  int value_;

 public:
  AIOWriteRequest(int value) : AIORequest(), value_(value) {
    buffer_[0] = value;
  }

  virtual void Complete(int res) {
    CHECK_EQ(res, kPageSize) << "Write incomplete or error " << res;
    LOG(INFO) << "Write of " << value_ << " completed";
  }
};

class AIOAdder : public Adder {
 public:
  int fd_;
  io_context_t ioctx_;
  int counter_;
  int reap_counter_;
  int sum_;
  int length_;

  AIOAdder(int length)
      : ioctx_(0), counter_(0), reap_counter_(0), sum_(0), length_(length) { }

  void Init() {
    LOG(INFO) << "Opening file";
    fd_ = open(FLAGS_path.c_str(), O_RDWR | O_DIRECT | O_CREAT, 0644);
    PCHECK(fd_ >= 0) << "Error opening file";
    LOG(INFO) << "Allocating enough space for the sum";
    PCHECK(fallocate(fd_, 0, 0, kPageSize * length_) >= 0) << "Error in fallocate";
    LOG(INFO) << "Setting up the io context";
    PCHECK(io_setup(100, &ioctx_) >= 0) << "Error in io_setup";
  }

  virtual void Add(int amount) {
    sum_ += amount;
    LOG(INFO) << "Adding " << amount << " for a total of " << sum_;
  }

  void SubmitWrite() {
    LOG(INFO) << "Submitting a write to " << counter_;
    struct iocb iocb;
    struct iocb* iocbs = &iocb;
    AIORequest *req = new AIOWriteRequest(counter_);
    io_prep_pwrite(&iocb, fd_, req->buffer_, kPageSize, counter_ * kPageSize);
    iocb.data = req;
    int res = io_submit(ioctx_, 1, &iocbs);
    CHECK_EQ(res, 1);
  }

  void WriteFile() {
    reap_counter_ = 0;
    for (counter_ = 0; counter_ < length_; counter_++) {
      SubmitWrite();
      Reap();
    }
    ReapRemaining();
  }

  void SubmitRead() {
    LOG(INFO) << "Submitting a read from " << counter_;
    struct iocb iocb;
    struct iocb* iocbs = &iocb;
    AIORequest *req = new AIOReadRequest(this);
    io_prep_pread(&iocb, fd_, req->buffer_, kPageSize, counter_ * kPageSize);
    iocb.data = req;
    int res = io_submit(ioctx_, 1, &iocbs);
    CHECK_EQ(res, 1);
  }

  void ReadFile() {
    reap_counter_ = 0;
    for (counter_ = 0; counter_ < length_; counter_++) {
        SubmitRead();
        Reap();
    }
    ReapRemaining();
  }

  int DoReap(int min_nr) {
    LOG(INFO) << "Reaping between " << min_nr << " and "
              << FLAGS_max_nr << " io_events";
    struct io_event* events = new io_event[FLAGS_max_nr];
    struct timespec timeout;
    timeout.tv_sec = 0;
    timeout.tv_nsec = 100000000;
    int num_events;
    LOG(INFO) << "Calling io_getevents";
    num_events = io_getevents(ioctx_, min_nr, FLAGS_max_nr, events,
                              &timeout);
    LOG(INFO) << "Calling completion function on results";
    for (int i = 0; i < num_events; i++) {
      struct io_event event = events[i];
      AIORequest* req = static_cast<AIORequest*>(event.data);
      req->Complete(event.res);
      delete req;
    }
    delete events;

LOG(INFO) << "Reaped " << num_events << " io_events";
    reap_counter_ += num_events;
    return num_events;
  }

  void Reap() {
    if (counter_ >= FLAGS_min_nr) {
      DoReap(FLAGS_min_nr);
    }
  }

  void ReapRemaining() {
    while (reap_counter_ < length_) {
      DoReap(1);
    }
  }

  ~AIOAdder() {
    LOG(INFO) << "Closing AIO context and file";
    io_destroy(ioctx_);
    close(fd_);
  }

  int Sum() {
    LOG(INFO) << "Writing consecutive integers to file";
    WriteFile();
    LOG(INFO) << "Reading consecutive integers from file";
    ReadFile();
    return sum_;
  }
};

int main(int argc, char* argv[]) {
  google::ParseCommandLineFlags(&argc, &argv, true);
  AIOAdder adder(FLAGS_file_size);
  adder.Init();
  int sum = adder.Sum();
  int expected = (FLAGS_file_size * (FLAGS_file_size - 1)) / 2;
  LOG(INFO) << "AIO is complete";
  CHECK_EQ(sum, expected) << "Expected " << expected << " Got " << sum;
  printf("Successfully calculated that the sum of integers from 0"
         " to %d is %d\n", FLAGS_file_size - 1, sum);
  return 0;
}