int main(int argc, char *argv[])
{
struct io_uring ring;
off_t insize;
int ret;
if (argc < 3) {
...
}
//open files
infd = open(argv[1], O_RDONLY);
if (infd < 0) {
...
}
// write file
outfd = open(argv[2], O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (outfd < 0) {
...
}
if (setup_context(QD, &ring))
...
// get file size
if (get_file_size(infd, &insize))
...
// copy to ring
ret = copy_file(&ring, insize);
close(infd);
close(outfd);
io_uring_queue_exit(&ring);
return ret;
}
这里的关键是setup_context(), 用来初始化io-uring.
setup_context
用来初始化queue
static int setup_context(unsigned entries, struct io_uring *ring)
{
int ret;
// init io_uring queue size which is 64
ret = io_uring_queue_init(entries, ring, 0);
if (ret < 0) {
...
}
return 0;
}
拷贝文件
static int copy_file(struct io_uring *ring, off_t insize)
{
unsigned long reads, writes;
// complete queue
struct io_uring_cqe *cqe;
off_t write_left, offset;
int ret;
// left bytes to be written
write_left = insize;
writes = reads = offset = 0;
while(insize || write_left) {
....
}
while(writes) {
...
}
}
看这个结构,不用多说,很自然,一边写,一边读,如果还剩余写的部分,那么剩余的部分继续写
读写文件的过程
while (insize || write_left) {
unsigned long had_reads;
int got_comp;
/*
* Queue up as many reads as we can
*/
had_reads = reads;
while (insize) {
...
}
if (had_reads != reads) {
...
}
/*
* Queue is full at this point. Find at least one completion.
*/
got_comp = 0;
while (write_left) {
...
}
}
reads 是指已经读的次数,writes 是指已经写的次数,前面指定了最多读写次数QD,不能超过
while(insize),如果还有剩余的字节数没读取完毕,那么需要读,而且尽可能的多读
如果这次读和上次读相比,有增加,需要提交(if (had_reads != reads))
while(insize)
while (insize) {
off_t this_size = insize;
// over buffer size
if (reads + writes >= QD)
break;
// 32 KB
if (this_size > BS)
this_size = BS;
// is zero
else if (!this_size)
break;
// queue read
// read error
// none zero means failed
if (queue_read(ring, this_size, offset))
break;
// left read size
insize -= this_size;
// last read pointer
offset += this_size;
reads++;
}
读的次数 + 写的次数 不能超过QD, 超过就不提交IO
一次最多读取64KB, 如果没有文件内容可读了,那么就停止
最重要的是是queue_read,会结合queue来准备提交IO
offset 是下次读取文件的位置
queue_read(ring, this_size, offset)
static int queue_read(struct io_uring *ring, off_t size, off_t offset)
{
// submit queue
struct io_uring_sqe *sqe;
// io data
struct io_data *data;
data = malloc(size + sizeof(*data));
if (!data)
return 1;
// get a submit entry
sqe = io_uring_get_sqe(ring);
if (!sqe) {
free(data);
return 1;
}
// set data ready to be read
data->read = 1;
// set off set
data->offset = data->first_offset = offset;
// set actual data pointer
data->iov.iov_base = data + 1;
// read size
data->iov.iov_len = size;
data->first_len = size;
// prepare for read, set read buffer which is data->iov
// offset 上次读的位置
// 设置关联的文件描述符
io_uring_prep_readv(sqe, infd, &data->iov, 1, offset);
// set bussiness data pointer
io_uring_sqe_set_data(sqe, data);
return 0;
}
if (had_reads != reads) {
ret = io_uring_submit(ring);
if (ret < 0) {
...
break;
}
}
提交read IO操作
while(write_left)
got_comp = 0;
while (write_left) {
// actual data
struct io_data *data;
if (!got_comp) {
// wait io complete, will write cqe
ret = io_uring_wait_cqe(ring, &cqe);
got_comp = 1;
} else {
...
}
if (ret < 0) {
...
return 1;
}
if (!cqe)
break;
// read from complete queue
data = io_uring_cqe_get_data(cqe);
// deal with error
if (cqe->res < 0) {
...
} else if ((size_t)cqe->res != data->iov.iov_len) {
...
}
/*
* All done. if write, nothing else to do. if read,
* queue up corresponding write.
*/
// data has been read
if (data->read) {
...
} else {
...
}
// mark cqe has been dealed with
io_uring_cqe_seen(ring, cqe);
}
首先等待一次io读完成(!got_comp), 等待过程中,如果失败, ret < 0,直接退出程序
如果等待过,那么peek 这次IO的完成情况(!got_comp对应的else分支)
// last time complete, this time just peak
// we don't need to wait
ret = io_uring_peek_cqe(ring, &cqe);
if (ret == -EAGAIN) {
cqe = NULL;
ret = 0;
}
else if ((size_t)cqe->res != data->iov.iov_len) {
// res less than actual size
/* Short read/write, adjust and requeue */
data->iov.iov_base += cqe->res;
data->iov.iov_len -= cqe->res;
//data actural put size
data->offset += cqe->res;
queue_prepped(ring, data);
// submit again
io_uring_submit(ring);
io_uring_cqe_seen(ring, cqe);
continue;
}
参考资料
在拷贝之的初始化
setup_context
用来初始化queue
拷贝文件
读写文件的过程
while(insize)
queue_read(ring, this_size, offset)
if (had_reads != reads)
while(write_left)
首先等待一次io读完成(!got_comp), 等待过程中,如果失败, ret < 0,直接退出程序
如果等待过,那么peek 这次IO的完成情况(!got_comp对应的else分支)
如果不是,那么直接退出程序
这时确认有complete entry,那么从complete entry处来获取数据
同样如果读取complete entry读取数据失败,也要区分什么样的错误类型
读取成功了,不是程序指定的大小,比指定的要小,那么需要进一步的读, cqe->res是实际读取的字符数
这时读取指定大小的数据了,那么能开始写了,同时标记这个complete entry已经被处理了
这部分写就完成了,但不代表写一定成功,所以while(writes)
while(writes)
queue_write(ring, data)
queue_prepped
总结
在使用liburing时,