Closed cmazakas closed 1 month ago
The general recommendation has always been "don't share a ring between threads". It's of course possible to do so with some caveats:
1) You can have one thread doing submissions and another reaping completions. That should work fine, without any added synchronization. 2) If you have multiple threads submitting IO, for example, you need to serialize access to that. 3) Any non-single-thread/task access to a ring will eliminate some optimizations that could otherwise get enabled, like IORING_SETUP_SINGLE_ISSUER or IORING_SETUP_DEFER_TASKRUN.
In general, if you can avoid sharing a ring, you should do so.
In general, if you can avoid sharing a ring, you should do so.
Agreed.
But to help guide users, what would a good multi-threaded TCP server look like?
Obviously, I'm volunteering as tribute to do all the legwork. I just wanna make sure visions align on what's idiomatic and good quality.
There are various ways to do that. You could have one thread handling connections with its own ring, using multishot accept direct for example. Then you can pass connections to other threads each with their private ring, passing the direct descriptor with it, using IORING_MSG_RING for example. Those threads would generally already be processing work and would see it next time they wait for events, or they are already waiting for events and would get woken by the CQE that's posted from the IORING_MSG_RING. That's if you have enough load that you don't handle anything but accepts in that main thread, it could of course also handle connections beyond accept.
Outside of that, I suspect most of the general recommendations for networking would be covered in the doc I wrote recently:
https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023
which is, however, not really touching on the topic of threading and io_uring. The key for multi-threading, as mentioned, is that you really don't want to share the ring.
Would probably be useful to write the basic code illustrating this, to go along with the guide?
Would probably be useful to write the basic code illustrating this, to go along with the guide?
Yup!
I actually don't even know what IORING_MSG_RING is. But the general gist of your explanation sounds perfect to me. A single pinned thread for the accept() loop, passing off request handling to a dedicated set of resources.
This is what most users will wind up wanting to model their own servers after.
IORING_MSG_RING is documented, and its liburing helpers too! But the tldr is that it's a way to send messages to other rings, and it can also be used to pass a direct file descriptor to another ring.
Ah, I see. It's IORING_OP_MSG_RING
. I've now also found all the relevant man page entries.
Alright, there's a lot for me to study here, it seems. Kind of nice that there's so much more to keep learning about liburing. This udp example you have is quite excellent, btw.
Curiously, would you be opposed to a C++ example?
Imo, it'd help demonstrate the features of liburing a bit more without being bogged down by C. For example, compare std::thread to the pthreads interface.
Unless there's a strong reason to use C++ rather than C (and I don't think pthreads is strong enough), then I would greatly prefer it to be in C as it's much easier to read for a wide audience. That said, if you feel strongly that C++ is much better AND you can write it in clean and easy to read C++, then obviously you get to pick as you're the one doing the work :-)
Hmm, I'm actually having a heck of a time getting just a basic rote liburing example to work with accept_direct().
For example, I'm doing something like this:
auto fds = std::array<int, 16>{};
fds.fill(-1);
constexpr auto const fixed_idx = 7u;
auto test_fd = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
fds[fixed_idx] = test_fd;
ret = ::io_uring_register_files(&ring, fds.data(), fds.size());
handle_error(ret);
{
auto sqe = ::io_uring_get_sqe(&ring);
::io_uring_prep_accept_direct(sqe, accept_fd, nullptr, nullptr, 0, fixed_idx);
sqe->user_data = 1;
::io_uring_submit(&ring);
}
So, we create a mostly blank FD array, I'm guessing I need to put the ::socket()
FD into there.
The accept call actually completes with cqe->res == 0
, which is expected but actually trying to write though the direct FD using the fixed_idx
causes a SIGPIPE error when I use it like this:
auto sqe = ::io_uring_get_sqe(&ring);
sqe->user_data = 3;
static char const buf[] = "Hello, world!";
::io_uring_prep_write(sqe, fds[fixed_idx], buf, sizeof(buf), 0);
::io_uring_submit(&ring);
I must be using this direct interface incorrectly. I tried basing this stuff off of the tests in liburing but I must be doing something wrong here.
Is there a good template I should follow here? Maybe I'll just copy-paste-edit the tests into something that works.
You need to set IOSQE_FIXED_FILE
in sqe->flags
if fd is set to a direct descriptor. If you don't, io_uring will assume it's a normal file descriptor. As we support both types, you need to be explicit on which one you want to use.
Ha, that's a really good catch.
You know what I realized I was doing incorrectly though?
I basically copy-pasted your accept.c test and I just pared it down. I realized; I was expecting io_uring to actually mutate the fd array I supplied it.
The way I thought it worked was that, direct accept would mutate the registered fd array at the file_idx
location. But it doesn't!
What you're supposed to do is pass the file_idx
as the fd to all the relevant functions!
Once I started doing that, my code started working perfectly. Not sure why I thought what I did but at least I figured it out.
As always, thank you for your time.
Alright, I have something small-ish that showcases how to get direct FDs from accept() and then transport them across threads, even properly closing them with the corresponding ring.
But I'm not sure it's a good fit for the liburing repo, the more I think about it.
I'm going to include the example code anyway but I'm not sure if it should be merged. C++ doesn't belong and there's so many other trade-offs to make, I think it's better for something like this to be its own blog post or series.
However, I do think it's useful for people like me (runtime implementors) to have a set of guidelines or commandments to follow. That's something where usually the author is the foremost expert.
#include <liburing.h>
#include <array>
#include <atomic>
#include <cassert>
#include <iostream>
#include <string>
#include <string_view>
#include <thread>
#include <vector>
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/un.h>
#include <unistd.h>
inline constexpr unsigned num_clients = 2;
void handle_error(int ret) {
if (ret != 0) {
printf("error %d: %s\n", ret, strerror(-ret));
}
assert(ret == 0);
}
void handle_errno(int ret) {
if (ret == -1) {
auto e = errno;
std::cout << strerror(e) << std::endl;
exit(1);
}
}
__kernel_timespec make_timespec(__kernel_time64_t tv_sec, long long tv_nsec) {
__kernel_timespec ts;
ts.tv_sec = tv_sec;
ts.tv_nsec = tv_nsec;
return ts;
}
struct client_socket {
int fd = -1;
client_socket() : fd{socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)} {
handle_errno(fd);
}
~client_socket() {
assert(fd >= 0);
close(fd);
}
int as_raw_fd() const { return fd; }
};
struct io_uring_context {
io_uring io_ring;
io_uring_context() {
auto ret = io_uring_queue_init(32, get_ring(), 0);
handle_error(ret);
}
~io_uring_context() { io_uring_queue_exit(get_ring()); }
io_uring *get_ring() { return &io_ring; }
};
sockaddr_in make_localhost_ipv4() {
sockaddr_in addr;
::memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = ::inet_addr("127.0.0.1");
addr.sin_port = ::htons(3030);
return addr;
}
void client_thread() {
auto ret = 0;
auto io_ctx = io_uring_context();
auto addr = make_localhost_ipv4();
auto client = client_socket();
auto fd = client.as_raw_fd();
ret = connect(fd, (sockaddr const *)&addr, sizeof(addr));
handle_errno(ret);
printf("done with client connect\n");
auto msg = std::string("hello, world!");
ret = write(fd, msg.data(), msg.size());
handle_errno(ret);
printf("done with client write (%d bytes)\n", ret);
auto buf = std::vector<unsigned char>(1024);
ret = read(fd, buf.data(), buf.size());
handle_errno(ret);
printf("done with client read\n");
close(fd);
auto response = std::string(buf.data(), buf.data() + ret);
printf("client transaction processed succesfully\n%s\n", response.c_str());
}
struct server_socket {
int fd = -1;
server_socket() : fd{socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)} {
handle_errno(fd);
auto val = 1u;
auto ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
handle_errno(ret);
auto addr = make_localhost_ipv4();
ret = ::bind(fd, (sockaddr *)&addr, sizeof(addr));
handle_errno(ret);
ret = ::listen(fd, 128);
handle_errno(ret);
}
~server_socket() {
assert(fd != -1);
close(fd);
}
int as_raw_fd() const { return fd; }
};
struct registered_files {
std::vector<int> fds;
io_uring *ring;
registered_files(io_uring *ring_, std::size_t n) : fds(n, -1), ring(ring_) {
auto ret = io_uring_register_files(ring, fds.data(), fds.size());
handle_error(ret);
}
~registered_files() { io_uring_unregister_files(ring); }
};
struct worker {
io_uring_context io_ctx;
registered_files fds;
worker() : io_ctx(), fds(io_ctx.get_ring(), num_clients) {}
int as_raw_fd() { return io_ctx.get_ring()->ring_fd; }
};
void accept_thread() {
auto ret = 0;
auto io_ctx = io_uring_context();
auto *ring = io_ctx.get_ring();
auto fds = registered_files(ring, num_clients);
auto server = server_socket();
auto workers = std::vector<worker>(num_clients);
auto worker_threads = std::vector<std::thread>();
for (auto idx = 0u; idx < workers.size(); ++idx) {
worker_threads.emplace_back([ring = workers[idx].io_ctx.get_ring()] {
auto ret = 0;
auto buf = std::vector<unsigned char>(1024);
auto msg = std::string();
auto fd = -1;
while (true) {
io_uring_cqe *cqe = nullptr;
ret = io_uring_wait_cqe(ring, &cqe);
handle_error(ret);
printf("received cqe on worker thread!\n");
if (cqe->res < 0) {
handle_error(cqe->res);
}
if (cqe->user_data == 1337) {
fd = cqe->res;
io_uring_cqe_seen(ring, cqe);
{
auto *sqe = io_uring_get_sqe(ring);
io_uring_prep_read(sqe, fd, buf.data(), buf.size(), 0);
sqe->user_data = 1;
sqe->flags |= IOSQE_FIXED_FILE;
}
io_uring_submit(ring);
printf("prepped read\n");
continue;
}
if (cqe->user_data == 1) {
auto const num_read = cqe->res;
io_uring_cqe_seen(ring, cqe);
{
msg = std::string("server received message, echoing: ") +
std::string(buf.begin(), buf.begin() + num_read);
auto *sqe = io_uring_get_sqe(ring);
io_uring_prep_write(sqe, fd, msg.data(), msg.size(), 0);
sqe->user_data = 2;
sqe->flags |= IOSQE_FIXED_FILE;
}
io_uring_submit(ring);
printf("prepped write\n");
continue;
}
if (cqe->user_data == 2) {
io_uring_cqe_seen(ring, cqe);
{
auto *sqe = io_uring_get_sqe(ring);
io_uring_prep_close_direct(sqe, fd);
sqe->user_data = 3;
}
io_uring_submit(ring);
printf("prepped close\n");
continue;
}
if (cqe->user_data == 3) {
break;
}
}
});
}
{
auto *sqe = io_uring_get_sqe(ring);
io_uring_prep_accept_direct(sqe, server.as_raw_fd(), nullptr, nullptr, 0,
IORING_FILE_INDEX_ALLOC);
sqe->user_data = 1337;
io_uring_submit(ring);
}
for (auto num_connected = 0u; num_connected < num_clients;) {
io_uring_cqe *cqe = nullptr;
ret = io_uring_peek_cqe(ring, &cqe);
if (ret != 0) {
ret = io_uring_wait_cqe(ring, &cqe);
handle_error(ret);
}
if (cqe->user_data == 1337) {
printf("accepted client fd: %d\n", cqe->res);
if (cqe->res < 0) {
handle_error(cqe->res);
}
{
printf("invoking prep_msg with %d\n", cqe->res);
auto *sqe = io_uring_get_sqe(ring);
io_uring_prep_msg_ring_fd_alloc(
sqe, workers[num_connected % workers.size()].as_raw_fd(), cqe->res,
1337, 0);
sqe->user_data = 7331;
}
{
auto *sqe = io_uring_get_sqe(ring);
io_uring_prep_accept_direct(sqe, server.as_raw_fd(), nullptr, nullptr,
0, IORING_FILE_INDEX_ALLOC);
sqe->user_data = 1337;
}
io_uring_submit(ring);
++num_connected;
}
io_uring_cqe_seen(ring, cqe);
}
for (auto &w : worker_threads) {
w.join();
}
}
int main() {
auto t1 = std::thread(accept_thread);
std::this_thread::sleep_for(std::chrono::seconds(1));
auto t2 = std::thread(client_thread);
auto t3 = std::thread(client_thread);
t1.join();
t2.join();
t3.join();
}
I'm noticing that there's no authoritative "from the source" docs outlining what the best practices would be for one aiming to use liburing in a multi-threaded scenario.
We should first begin be defining what the actual recommended way is (or even just define useful templates for end-users), then we can craft the examples then perhaps a section in the man pages.