Closed pyhd closed 8 months ago
Do you have a test case I could try? I wrote a basic one just now for testing, I'll include it below. Doesn't seem to show any latency issues, here's some sample output from a box here:
fd=0, packets=309, min_usec=2, max_usec=8
fd=1, packets=320, min_usec=2, max_usec=7
fd=2, packets=311, min_usec=2, max_usec=7
fd=3, packets=285, min_usec=2, max_usec=8
fd=4, packets=321, min_usec=2, max_usec=7
fd=5, packets=326, min_usec=2, max_usec=8
fd=6, packets=310, min_usec=2, max_usec=4
fd=7, packets=360, min_usec=2, max_usec=4
fd=8, packets=306, min_usec=2, max_usec=4
fd=9, packets=319, min_usec=2, max_usec=7
fd=10, packets=341, min_usec=2, max_usec=6
fd=11, packets=283, min_usec=2, max_usec=4
fd=12, packets=296, min_usec=2, max_usec=8
fd=13, packets=324, min_usec=2, max_usec=5
fd=14, packets=301, min_usec=2, max_usec=5
fd=15, packets=295, min_usec=2, max_usec=7
fd=16, packets=309, min_usec=2, max_usec=8
fd=17, packets=313, min_usec=2, max_usec=7
fd=18, packets=314, min_usec=2, max_usec=7
fd=19, packets=328, min_usec=2, max_usec=4
fd=20, packets=318, min_usec=2, max_usec=5
fd=21, packets=298, min_usec=2, max_usec=5
fd=22, packets=324, min_usec=2, max_usec=8
fd=23, packets=338, min_usec=2, max_usec=7
fd=24, packets=288, min_usec=2, max_usec=8
fd=25, packets=311, min_usec=2, max_usec=4
fd=26, packets=308, min_usec=2, max_usec=5
fd=27, packets=310, min_usec=2, max_usec=7
fd=28, packets=329, min_usec=2, max_usec=8
fd=29, packets=319, min_usec=2, max_usec=6
fd=30, packets=299, min_usec=2, max_usec=8
fd=31, packets=287, min_usec=2, max_usec=8
Overall: min_usec=2, max_usec=8
Note that the timing includes doing the write as well on the thread side (and yeah it should do avg/stddev as well, most are around min).
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <pthread.h>
#include <sys/time.h>
#include <liburing.h>
#define NR_CONNS 32
#define BGID 17
#define BR_MASK (NR_CONNS - 1)
#define NR_PACKETS 10000
struct packet {
struct timeval t;
long id;
};
struct conn {
int fds[2];
};
static unsigned long long utime_since(const struct timeval *s,
const struct timeval *e)
{
long long sec, usec;
sec = e->tv_sec - s->tv_sec;
usec = (e->tv_usec - s->tv_usec);
if (sec > 0 && usec < 0) {
sec--;
usec += 1000000;
}
sec *= 1000000;
return sec + usec;
}
static unsigned long long utime_since_now(struct timeval *tv)
{
struct timeval end;
gettimeofday(&end, NULL);
return utime_since(tv, &end);
}
static void *thread_fn(void *data)
{
struct conn *conns = data;
int i, ret;
for (i = 0; i < NR_PACKETS; i++) {
int id = rand() % NR_CONNS;
struct conn *c = &conns[id];
struct packet p;
gettimeofday(&p.t, NULL);
p.id = id;
ret = write(c->fds[1], &p, sizeof(p));
if (ret != sizeof(p)) {
fprintf(stderr, "bad packet write %d\n", ret);
return NULL;
}
if (!(i & (NR_CONNS - 1)))
usleep(rand() % 1000);
}
return NULL;
}
struct elapsed {
int nr_packets;
unsigned long min_time;
unsigned long max_time;
};
int main(int argc, char *argv[])
{
struct io_uring_buf_ring *br;
struct io_uring_sqe *sqe;
struct io_uring_cqe *cqe;
struct io_uring ring;
struct conn *conns;
struct elapsed state[NR_CONNS];
pthread_t thread;
int i, ret;
void *buf, *tret;
unsigned long max_t, min_t;
struct packet *packets[NR_CONNS];
srand(getpid());
conns = malloc(sizeof(struct conn) * NR_CONNS);
for (i = 0; i < NR_CONNS; i++) {
if (pipe(conns[i].fds) < 0) {
perror("pipe");
return 1;
}
state[i].nr_packets = 0;
state[i].min_time = -1UL;
state[i].max_time = 0;
}
ret = io_uring_queue_init(NR_CONNS, &ring, IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN);
if (ret) {
fprintf(stderr, "queue_init: %d\n", ret);
return 1;
}
br = io_uring_setup_buf_ring(&ring, NR_CONNS, BGID, 0, &ret);
if (!br) {
fprintf(stderr, "failed buffer ring %d\n", ret);
return 1;
}
buf = malloc(sizeof(struct packet) * NR_CONNS * 4);
for (i = 0; i < NR_CONNS; i++) {
packets[i] = buf + i * sizeof(struct packet) * 4;
io_uring_buf_ring_add(br, packets[i], sizeof(struct packet), i, BR_MASK, i);
}
io_uring_buf_ring_advance(br, NR_CONNS);
for (i = 0; i < NR_CONNS; i++) {
sqe = io_uring_get_sqe(&ring);
io_uring_prep_read_multishot(sqe, conns[i].fds[0], 0, 0, BGID);
sqe->user_data = i;
}
ret = io_uring_submit(&ring);
if (ret != NR_CONNS) {
fprintf(stderr, "bad submit %d\n", ret);
return 1;
}
pthread_create(&thread, NULL, thread_fn, conns);
i = 0;
do {
struct elapsed *e;
unsigned long elapsed;
struct packet *p;
int buf_index;
ret = io_uring_wait_cqe(&ring, &cqe);
if (ret) {
fprintf(stderr, "wait %d\n", ret);
break;
}
if (cqe->res != sizeof(struct packet))
printf("size %d\n", cqe->res);
if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
fprintf(stderr, "buffer not set\n");
return 1;
}
if (!(cqe->flags & IORING_CQE_F_MORE)) {
fprintf(stderr, "more not set\n");
return 1;
}
buf_index = cqe->flags >> 16;
assert(buf_index >= 0 && buf_index <= NR_CONNS);
p = packets[buf_index];
elapsed = utime_since_now(&p->t);
e = &state[cqe->user_data];
e->nr_packets++;
if (elapsed > e->max_time)
e->max_time = elapsed;
if (elapsed < e->min_time)
e->min_time = elapsed;
if (p->id != cqe->user_data)
printf("mismatch %ld %ld\n", p->id, (long) cqe->user_data);
io_uring_buf_ring_add(br, p, sizeof(struct packet), buf_index, BR_MASK, 0);
io_uring_buf_ring_advance(br, 1);
io_uring_cqe_seen(&ring, cqe);
} while (++i < NR_PACKETS);
max_t = 0;
min_t = -1UL;
for (i = 0; i < NR_CONNS; i++) {
struct elapsed *e = &state[i];
if (!e->nr_packets)
continue;
printf("fd=%d, packets=%d, min_usec=%lu, max_usec=%lu\n", i,
e->nr_packets, e->min_time, e->max_time);
if (e->min_time < min_t)
min_t = e->min_time;
if (e->max_time > max_t)
max_t = e->max_time;
}
printf("Overall: min_usec=%lu, max_usec=%lu\n", min_t, max_t);
pthread_join(thread, &tret);
return 0;
}
I ran some more testing and discovered that multishot won't retry properly. Let's say you have 32 byte buffers, but there's 64 bytes pending. We'll get the first one, but then not the second part until more data arrives. I'm guessing this is what leads to your issue here.
Are you able to recompile your kernel? If so, there's a patch below, or I can provide a branch you can pull. I don't love the patch as we'll always be doing an extra retry, it would be more ideal if this was gated on data availability instead of doing the extraneous retry always. I'll see if I can improve it.
diff --git a/io_uring/rw.c b/io_uring/rw.c
index 118cc9f1cf16..6d75cc9c5d88 100644
--- a/io_uring/rw.c
+++ b/io_uring/rw.c
@@ -923,7 +923,10 @@ int io_read(struct io_kiocb *req, unsigned int issue_flags)
int io_read_mshot(struct io_kiocb *req, unsigned int issue_flags)
{
struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
+ unsigned int __issue_flags = issue_flags;
+ struct io_ring_ctx *ctx = req->ctx;
unsigned int cflags = 0;
+ bool did_io = false;
int ret;
/*
@@ -932,41 +935,59 @@ int io_read_mshot(struct io_kiocb *req, unsigned int issue_flags)
if (!file_can_poll(req->file))
return -EBADFD;
- ret = __io_read(req, issue_flags);
-
/*
- * If we get -EAGAIN, recycle our buffer and just let normal poll
- * handling arm it.
+ * Lock the ring, if it isn't already, as recycling buffers can
+ * otherwise be wasteful.
*/
- if (ret == -EAGAIN) {
+ io_ring_submit_lock(ctx, issue_flags);
+ __issue_flags &= ~IO_URING_F_UNLOCKED;
+
+ do {
+ ret = __io_read(req, __issue_flags);
+
+ /*
+ * Any successful return value keeps the multishot read armed.
+ */
+ if (ret > 0) {
+ /*
+ * Put our buffer and post a CQE. If we fail to post a
+ * CQE, then jump to the termination path. This request
+ * is then done.
+ */
+ cflags = io_put_kbuf(req, __issue_flags);
+ rw->len = 0; /* similarly to below, reset len to 0 */
+ did_io = true;
+
+ if (!io_fill_cqe_req_aux(req,
+ issue_flags & IO_URING_F_COMPLETE_DEFER,
+ ret, cflags | IORING_CQE_F_MORE))
+ break;
+ continue;
+ }
+
/*
* Reset rw->len to 0 again to avoid clamping future mshot
* reads, in case the buffer size varies.
*/
- if (io_kbuf_recycle(req, issue_flags))
+ if (io_kbuf_recycle(req, __issue_flags))
rw->len = 0;
- return -EAGAIN;
- }
- /*
- * Any successful return value will keep the multishot read armed.
- */
- if (ret > 0) {
/*
- * Put our buffer and post a CQE. If we fail to post a CQE, then
- * jump to the termination path. This request is then done.
+ * If we get -EAGAIN, or a zero return and we already did IO
+ * in this loop, recycle our buffer and just let normal poll
+ * handling arm it.
*/
- cflags = io_put_kbuf(req, issue_flags);
- rw->len = 0; /* similarly to above, reset len to 0 */
-
- if (io_fill_cqe_req_aux(req,
- issue_flags & IO_URING_F_COMPLETE_DEFER,
- ret, cflags | IORING_CQE_F_MORE)) {
+ if (ret == -EAGAIN || (!ret && did_io)) {
+ io_ring_submit_unlock(ctx, issue_flags);
if (issue_flags & IO_URING_F_MULTISHOT)
return IOU_ISSUE_SKIP_COMPLETE;
return -EAGAIN;
+ } else if (ret < 0) {
+ break;
}
- }
+ } while (1);
+
+ io_ring_submit_unlock(ctx, issue_flags);
/*
* Either an error, or we've hit overflow posting the CQE. For any
Here's a simpler and cleaner variant. Would be great if you could give it a shot, or let me know if you want me to put it in a branch based on v6.7 that you can just pull.
diff --git a/io_uring/poll.h b/io_uring/poll.h
index ff4d5d753387..bfd93e5ed3a7 100644
--- a/io_uring/poll.h
+++ b/io_uring/poll.h
@@ -24,6 +24,13 @@ struct async_poll {
struct io_poll *double_poll;
};
+static inline void io_poll_multishot_retry(struct io_kiocb *req,
+ unsigned int issue_flags)
+{
+ if (issue_flags & IO_URING_F_MULTISHOT)
+ atomic_inc(&req->poll_refs);
+}
+
int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
int io_poll_add(struct io_kiocb *req, unsigned int issue_flags);
diff --git a/io_uring/rw.c b/io_uring/rw.c
index 118cc9f1cf16..1e2882e144b5 100644
--- a/io_uring/rw.c
+++ b/io_uring/rw.c
@@ -18,6 +18,7 @@
#include "opdef.h"
#include "kbuf.h"
#include "rsrc.h"
+#include "poll.h"
#include "rw.h"
struct io_rw {
@@ -962,8 +963,15 @@ int io_read_mshot(struct io_kiocb *req, unsigned int issue_flags)
if (io_fill_cqe_req_aux(req,
issue_flags & IO_URING_F_COMPLETE_DEFER,
ret, cflags | IORING_CQE_F_MORE)) {
- if (issue_flags & IO_URING_F_MULTISHOT)
+ if (issue_flags & IO_URING_F_MULTISHOT) {
+ /*
+ * Force retry, as we might have more data to
+ * be read and otherwise it won't get retried
+ * until (if ever) another poll is triggered.
+ */
+ io_poll_multishot_retry(req, issue_flags);
return IOU_ISSUE_SKIP_COMPLETE;
+ }
return -EAGAIN;
}
}
And here it is committed:
https://git.kernel.dk/cgit/linux/commit/?h=io_uring-6.8&id=b0a354598fd4c732661f82cc50346d590fb2d72d
as that should be easier to deal with than some paste in here...
Here's a simpler and cleaner variant. Would be great if you could give it a shot, or let me know if you want me to put it in a branch based on v6.7 that you can just pull.
diff --git a/io_uring/poll.h b/io_uring/poll.h index ff4d5d753387..bfd93e5ed3a7 100644 --- a/io_uring/poll.h +++ b/io_uring/poll.h @@ -24,6 +24,13 @@ struct async_poll { struct io_poll *double_poll; }; +static inline void io_poll_multishot_retry(struct io_kiocb *req, + unsigned int issue_flags) +{ + if (issue_flags & IO_URING_F_MULTISHOT) + atomic_inc(&req->poll_refs); +} + int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe); int io_poll_add(struct io_kiocb *req, unsigned int issue_flags); diff --git a/io_uring/rw.c b/io_uring/rw.c index 118cc9f1cf16..1e2882e144b5 100644 --- a/io_uring/rw.c +++ b/io_uring/rw.c @@ -18,6 +18,7 @@ #include "opdef.h" #include "kbuf.h" #include "rsrc.h" +#include "poll.h" #include "rw.h" struct io_rw { @@ -962,8 +963,15 @@ int io_read_mshot(struct io_kiocb *req, unsigned int issue_flags) if (io_fill_cqe_req_aux(req, issue_flags & IO_URING_F_COMPLETE_DEFER, ret, cflags | IORING_CQE_F_MORE)) { - if (issue_flags & IO_URING_F_MULTISHOT) + if (issue_flags & IO_URING_F_MULTISHOT) { + /* + * Force retry, as we might have more data to + * be read and otherwise it won't get retried + * until (if ever) another poll is triggered. + */ + io_poll_multishot_retry(req, issue_flags); return IOU_ISSUE_SKIP_COMPLETE; + } return -EAGAIN; } }
On kernel 6.7.2 patched manually, the read_multishot
now works as expected. Really appreciate that!
Also should it be applied on poll_multishot
?
On kernel 6.7.2 patched manually, the read_multishot now works as expected. Really appreciate that!
Excellent! Thanks for testing.
Also should it be applied on poll_multishot ?
The poll implementation is edge triggered, so it's really up to the consumer of the poll multishot request to handle this.
This is in Linus's tree and merged into stable trees, so will be available there soon too. Closing.
On kernel 6.7
Normal RTT should be 200-300ms, with a single poll/read shot then re-arm it. But the latency rose to several seconds with the multishot variants.
Tested with different combinations
poll_multishot
with tun device fd , TCP/UDP socketsread_multishot
with tun device fdResults (
read_multishot
with tun device fd +poll_add
with UDP):