axboe / liburing

Library providing helpers for the Linux kernel io_uring support
MIT License
2.77k stars 398 forks source link

io_uring read multishot fails when trying to read a tty #1185

Closed rinoandrejohnsen closed 1 month ago

rinoandrejohnsen commented 1 month ago

Hi!

I needed to write a terminal proxy, so when I looked over my options, I figured io_uring with multishot read would be the correct solutions, so I wrote a small program to test it, but that does not seem to work.

Basically, it fails on this:

if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
    fprintf (
      stderr,
      "buffer not set\n"
    );

    return -101;
  }

I am not sure to why, because it should have worked (right?).

See the full code here:

# include <stdio.h>
# include <unistd.h>
# include <stdlib.h>
# include <assert.h>
# include <sys/time.h>
# include <termios.h>
# include <liburing.h>

# define group_id 17
# define buffers_size 4
# define buffer_mask (buffers_size - 1)
# define buffer_size 32

// socat pty,link=/tmp/ttyA,rawer STDIO

static int setup_terminal (
  int terminal_descriptor
) {
  int status = 0;

  struct termios terminal_settings;

  status = tcgetattr (
    terminal_descriptor,
    &terminal_settings
  );

  if (status != 0) {
    fprintf (
      stderr,
      "failed to get terminal settings %d\n",
      status
    );

    goto exit;
  }

  terminal_settings.c_cflag = CS8 | CREAD | CLOCAL;
  terminal_settings.c_cc [VMIN] = 1;

  status = cfsetospeed (
    &terminal_settings,
    B9600
  );

  if (status != 0) {
    fprintf (
      stderr,
      "failed to set output speed settings %d\n",
      status
    );

    goto exit;
  }

  status = cfsetispeed (
    &terminal_settings,
    B9600
  );

  if (status != 0) {
    fprintf (
      stderr,
      "failed to set input speed settings %d\n",
      status
    );

    goto exit;
  }

  status = tcsetattr (
    terminal_descriptor,
    TCSANOW,
    &terminal_settings
  );

  if (status != 0) {
    fprintf (
      stderr,
      "failed to set terminal settings %d\n",
      status
    );

    goto exit;
  }
exit:
  return status;
}

int main(
  int arguments_size,
  char* arguments []
) {
  int status = 0;

  if (arguments_size < 1) {
    fprintf (
      stderr,
      "expected terminal path to be passed as an argument\n"
    );
  }

  int terminal_descriptor = open (
    arguments [1],
    O_RDWR,
    O_NONBLOCK
  );

  status = isatty (
    terminal_descriptor
  );

  if (status < 0) {
    printf (
      "%s (%d) is not a terminal\n",
      arguments [1],
      terminal_descriptor
    );
  }

  status = setup_terminal (
    terminal_descriptor
  );

  if (status < 0) {
    fprintf (
      stderr,
      "failed setting up terminal\n"
    );

    return status;
  }

  struct io_uring ring;

  status = io_uring_queue_init (
    8,
    &ring,
    0
  );

  if (status < 0) {
    fprintf (
      stderr,
      "queue_init: %d\n",
      status
    );

    return -12;
  }

  struct io_uring_buf_ring* buffer_ring = io_uring_setup_buf_ring (
    &ring,
    buffers_size,
    group_id,
    0,
    &status
  );

  if (!buffer_ring) {
    fprintf (
      stderr,
      "failed buffer ring %d\n",
      status
    );

    return status;
  }

  void* buffer = malloc (buffers_size * buffer_size);

  for (int count = 0; count < buffers_size; count++) {
    void* buffer_segment = buffer + count * buffer_size;

    io_uring_buf_ring_add (
      buffer_ring,
      buffer_segment,
      buffer_size,
      count,
      buffer_mask,
      count
    );
  }

  io_uring_buf_ring_advance (
    buffer_ring,
    buffers_size
  );

  struct io_uring_sqe* sqe = io_uring_get_sqe (
    &ring
  );

  sqe->flags |= IOSQE_BUFFER_SELECT;

  io_uring_prep_read_multishot (
    sqe,
    terminal_descriptor,
    0,
    0,
    group_id
  );

  status = io_uring_submit (
    &ring
  );

  if (status != 1) {
    fprintf (
      stderr,
      "bad submit %d\n",
      status
    );

    return -12;
  }

  struct io_uring_cqe *cqe;

  status = io_uring_peek_cqe (
    &ring,
    &cqe
  );

  if (!status) {
    if (cqe->res == -EINVAL || cqe->res == -EBADF) {
      return -12;
    }
  }

  printf ("  - waiting for data -  \n");

  status = io_uring_wait_cqe (
    &ring,
    &cqe
  );

  if (status) {
    fprintf (
      stderr,
      "wait %d\n",
      status
    );

    return status;
  }

  if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
    fprintf (
      stderr,
      "buffer not set\n"
    );

    return -101;
  }

  if (!(cqe->flags & IORING_CQE_F_MORE)) {
    fprintf (
      stderr,
      "more not set\n"
    );

    return -102;
  }

  printf (
    "all good!"
  );

  io_uring_cqe_seen (
    &ring,
    cqe
  );

  return 0;
}

Run this command in one terminal: 'socat pty,link=/tmp/ttyA,rawer STDIO' and run the program in another. When both are up, type some character - followed by enter - in the socat terminal and see the result in the program terminal.

RogerMarcoHernandez commented 1 month ago

@rinoandrejohnsen Turns out the /tmp/ttyA device, at least as created, doesn't support NOWAIT, hence, as seen in the code below from linux sources, it fallbacks to single-shot mode, within which the IORING_CQE_F_BUFFER and IORING_CQE_F_MORE flags aren't considered anymore.

    949          * If the file doesn't support proper NOWAIT, then disable multishot
    950          * and stay in single shot mode.
    951          */
    952         if (!io_file_supports_nowait(req))
    953                 req->flags &= ~REQ_F_APOLL_MULTISHOT;
    974         if (ret > 0 && req->flags & REQ_F_APOLL_MULTISHOT) {
    975                 /*
    976                  * Put our buffer and post a CQE. If we fail to post a CQE, then
    977                  * jump to the termination path. This request is then done.
    978                  */
    979                 cflags = io_put_kbuf(req, issue_flags);
    980                 rw->len = 0; /* similarly to above, reset len to 0 */
    981
    982                 if (io_req_post_cqe(req, ret, cflags | IORING_CQE_F_MORE)) {
    983                         if (issue_flags & IO_URING_F_MULTISHOT) {
    984                                 /*
    985                                  * Force retry, as we might have more data to
    986                                  * be read and otherwise it won't get retried
    987                                  * until (if ever) another poll is triggered.
    988                                  */
    989                                 io_poll_multishot_retry(req);
    990                                 return IOU_ISSUE_SKIP_COMPLETE;
    991                         }
    992                         return -EAGAIN;
    993                 }
    994         }

However, you can still check the read contents by reading from the beginning of the buffer until cqe->res(# bytes read) if no error occurred.

    147   // If device doesn't support NOWAIT, IORING_CQE_F_BUFFER and IORING_CQE_F_MORE won't be set and it
    148   // will fallback to single-shot mode.
    149   // if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
    150   //   fprintf(stderr, "buffer not set %s\n", strerror(-cqe->res));
    151   //
    152   //   return -101;
    153   // }
    154
    155   // Only set when there is still data to read.
    156   // if (!(cqe->flags & IORING_CQE_F_MORE)) {
    157   //   fprintf(stderr, "more not set %m\n");
    158   //
    159   //   return -102;
    160   // }
    161
    162   printf("all good!\n");
    163   write(1, buffer, cqe->res);
    164
    165   io_uring_cqe_seen(&ring, cqe);
    166
    167   return 0;
    168 }
rinoandrejohnsen commented 1 month ago

@RogerMarcoHernandez Hi! Thank you for looking into this. A couple of questions comes do mind reading your response:

  1. The file supports being polled and read, so what does the If the file doesn't support proper NOWAIT ... mean in terms of multishot not being able to work correctly?
  2. Does the singelshot mean that we need to submit a new read command to the ring?
isilence commented 1 month ago

Right, w/o NOWAIT support it'll get downgraded to oneshot, then the user can issue a new request if necessary.

It's possible to execute it from a separate thread (io-wq) and then continue polling or just reissue in a waiting mode from io-wq, but I don't think there is much interest, all fast IO paths should support NOWAIT.

rinoandrejohnsen commented 1 month ago

I must admit that I am a bit baffled.

I have opened the tty in nonblocking mode and the read call returns -EAGAIN if I ask in a loop. That should be sufficient for this to work(?).

I have another example with epoll that listen for EPOLLIN and that works as expected, so I find it a bit strange that io_uring does not work under the same conditions.

axboe commented 1 month ago

The point is that io_uring needs per-io hints, and you're relying on per-file hints. It would indeed be possible to forego the NOWAIT support IFF the file is opened O_NONBLOCK, and remains so. Unfortunately lots of drivers don't even respect O_NONBLOCK, and that would end in tears on those. We have no way of flagging O_NONBLOCK support in the kernel specifically, instead NOWAIT is used as a general condition where we know it to be safe to do.

This is the difference between "hey it works in this case I tried" and "it's guaranteed to work regardless of file type".

rinoandrejohnsen commented 1 month ago

I see.

Let's imagine that we could set O_NONBLOCK as flag given to io_uring_prep_read_multishot or given to io_uring_register_files. Would that be enough information given to forgo the NOWAIT check in the kernel?

I am thinking that if there aren't enough information available in the kernel, then that information could be given by the api caller.

axboe commented 1 month ago

O_NONBLOCK is, by definition, not a per-io flag. It's a per-file flag. Hence it doesn't make sense to provide that to a per-command operation, in fact io_uring sets the equivalent flag for issue by itself. But io_uring cannot modify file flags, hence it cannot set O_NONBLOCK. That's up to whoever owns the file descriptor.

No matter how you look at it, this is a driver side issue for that particular file type.

rinoandrejohnsen commented 1 month ago

After reading your comments a couple of times, I thought that I should double check my code.

As it turns out, open with a O_NONBLOCK wasn't enough. I had to set O_NONBLOCK with fcntl as well. Now it works perfectly! Sorry to take up your time on such a mistake.

I am adding the working code here, in case someone comes across this and want to test it out.

# include <stdio.h>
# include <unistd.h>
# include <stdlib.h>
# include <assert.h>
# include <sys/time.h>
# include <termios.h>
# include <liburing.h>

# define group_id 17
# define buffers_size 4
# define buffer_mask (buffers_size - 1)
# define buffer_size 32

// socat pty,link=/tmp/ttyA,rawer STDIO

static int setup_terminal (
  int terminal_descriptor
) {
  int status = 0;

  struct termios terminal_settings;

  status = tcgetattr (
    terminal_descriptor,
    &terminal_settings
  );

  if (status != 0) {
    fprintf (
      stderr,
      "failed to get terminal settings %d\n",
      status
    );

    goto exit;
  }

  terminal_settings.c_cflag = CS8 | CREAD | CLOCAL;
  terminal_settings.c_cc [VMIN] = 1;

  status = cfsetospeed (
    &terminal_settings,
    B9600
  );

  if (status != 0) {
    fprintf (
      stderr,
      "failed to set output speed settings %d\n",
      status
    );

    goto exit;
  }

  status = cfsetispeed (
    &terminal_settings,
    B9600
  );

  if (status != 0) {
    fprintf (
      stderr,
      "failed to set input speed settings %d\n",
      status
    );

    goto exit;
  }

  status = tcsetattr (
    terminal_descriptor,
    TCSANOW,
    &terminal_settings
  );

  if (status != 0) {
    fprintf (
      stderr,
      "failed to set terminal settings %d\n",
      status
    );

    goto exit;
  }
exit:
  return status;
}

int main(
  int arguments_size,
  char* arguments []
) {
  int status = 0;

  if (arguments_size < 1) {
    fprintf (
      stderr,
      "expected terminal path to be passed as an argument\n"
    );
  }

  int terminal_descriptor = open (
    arguments [1],
    O_RDWR,
    O_NONBLOCK
  );

  status = isatty (
    terminal_descriptor
  );

  if (status < 0) {
    printf (
      "%s (%d) is not a terminal\n",
      arguments [1],
      terminal_descriptor
    );
  }

  int flags = fcntl (
    terminal_descriptor,
    F_GETFL,
    0
  );

  status = fcntl (
    terminal_descriptor,
    F_SETFL,
    flags | O_NONBLOCK
  );

  if (status != 0) {
    fprintf (
      stderr,
      "failed to set O_NONBLOCK on terminal %d\n",
      status
    );

    return status;
  }

  status = setup_terminal (
    terminal_descriptor
  );

  if (status < 0) {
    fprintf (
      stderr,
      "failed setting up terminal\n"
    );

    return status;
  }

  struct io_uring ring;

  status = io_uring_queue_init (
    8,
    &ring,
    0
  );

  if (status < 0) {
    fprintf (
      stderr,
      "queue_init: %d\n",
      status
    );

    return -12;
  }

  struct io_uring_buf_ring* buffer_ring = io_uring_setup_buf_ring (
    &ring,
    buffers_size,
    group_id,
    0,
    &status
  );

  if (!buffer_ring) {
    fprintf (
      stderr,
      "failed buffer ring %d\n",
      status
    );

    return status;
  }

  char* buffer = malloc (buffers_size * buffer_size);

  for (int count = 0; count < buffers_size; count++) {
    void* buffer_segment = buffer + count * buffer_size;

    io_uring_buf_ring_add (
      buffer_ring,
      buffer_segment,
      buffer_size,
      count,
      buffer_mask,
      count
    );
  }

  io_uring_buf_ring_advance (
    buffer_ring,
    buffers_size
  );

  struct io_uring_sqe* sqe = io_uring_get_sqe (
    &ring
  );

  sqe->flags |= IOSQE_BUFFER_SELECT;

  io_uring_prep_read_multishot (
    sqe,
    terminal_descriptor,
    0,
    0,
    group_id
  );

  status = io_uring_submit (
    &ring
  );

  if (status != 1) {
    fprintf (
      stderr,
      "bad submit %d\n",
      status
    );

    return -12;
  }

  struct io_uring_cqe *cqe;

  status = io_uring_peek_cqe (
    &ring,
    &cqe
  );

  if (!status) {
    if (cqe->res == -EINVAL || cqe->res == -EBADF) {
      return -12;
    }
  }

  printf ("  - waiting for data -  \n");

  while (1) {
    status = io_uring_wait_cqe (
      &ring,
      &cqe
    );

    if (status) {
      fprintf (
        stderr,
        "wait %d\n",
        status
      );

      return status;
    }

    if (cqe->res < 0) {
      if (cqe->res == -ENOBUFS) {
        fprintf (
          stderr,
          "ran out of buffers ");
      }

      fprintf (
        stderr,
        "error: %d\n",
        cqe->res
      );

      return -101;
    }

    if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
      fprintf (
        stderr,
        "buffer not set\n"
      );

      return -102;
    }

    if (!(cqe->flags & IORING_CQE_F_MORE)) {
      fprintf (
        stderr,
        "more not set\n"
      );

      return -103;
    }

    int buffer_index = cqe->flags >> 16;

    printf (
      "%s\n",
      (char*) buffer_ring->bufs[buffer_index].addr
    );

    io_uring_cqe_seen (
      &ring,
      cqe
    );
  }

  return 0;
}