denoland / fastwebsockets

A fast RFC6455 WebSocket implementation
https://docs.rs/fastwebsockets/
Apache License 2.0
812 stars 56 forks source link

io_uring #31

Open littledivy opened 1 year ago

Boolin365 commented 9 months ago

Would be really cool to be able to integrate monoio & fastwebsockets - any eta for this?

littledivy commented 9 months ago

No one is working on this at the moment. Contributions are welcome :)

raashidanwar commented 7 months ago

Hey @littledivy,

I would love to work on this. Would be glad if you give me some more context.

parastrom commented 6 months ago

Hyper 1.0 & io_uring - I think they explain the problem quite clearly. From what I've seen/tinkered with, to ensure safety we need to generate a static reference from whatever buffer we're dealing with for network IO via io_uring. The main problem is ensuring the buffer's lifetime during asynchronous operations with io_uring. Two approaches come to mind: Pinning/Boxing the buffer or handling ownership directly. Pinning (Pin<[u8; N]>) has nuances I'm not entirely clear about in this context, and Boxing (Box<[u8; N]>) incurs performance overhead.

Given that the buffer implementation must comply with monoio::buf::IoBuf, the Payload::Owned variant seems like the only feasible option. This is because it's not bound to the external lifetime of fastwebsockets::Frame, unlike other Payload variants. However, we could consider unsafely extending the lifetime of any Payload variant with monoio::buf::{RawBuf, RawBufVectored}, though this raises safety concerns.

Here's a a naive translation of fastwebsockets::Frame::writev for the for Linux targets with the uring feature flag. It uses monoio traits for asynchronous write operations:

#[cfg(all(target_os = "linux", feature = "uring"))]
  pub async fn writev<S>(
    &mut self,
    stream: &mut S,
  ) -> Result<(), std::io::Error>
  where
    S: AsyncWriteRentExt + Unpin,
  {
    use monoio::buf::{RawBufVectored, RawBuf};
    use std::io::IoSlice;

    let mut head = [0; MAX_HEAD_SIZE];
    let size = self.fmt_head(&mut head);

    let total = size + self.payload.len();

    let mut b = [IoSlice::new(&head[..size]), IoSlice::new(&self.payload)];

    // SAFETY: 
    // 1. `Frame` and therefore `Payload` must outlive `RawBufvectored` so that mem region remains valid
    // for the duration of the async write operation
    // 2. Given "1" the pointer and the length remain valid, `head` slice will be dropped later than `raw`
    // and payload lives as long as `Frame`.
    // IMPORTANT: Responsibility of the caller to ensure that `Frame` isn't dropped, nor `Payload` modified
    // while the operation remains pending - Failure to guarantee may cause UB
    let raw = unsafe {RawBufVectored::new(b.as_ptr() as * const _, b.len())};

    let (res, _) = stream.write_vectored_all(raw).await;
    let mut n = res?;

    if n == total {
      return Ok(());
    }

    while n <= size {
      b[0] = IoSlice::new(&head[n..size]);
      let raw = unsafe {
         RawBufVectored::new(b.as_ptr() as * const _, b.len())
      };
      let (res, _) = stream.write_vectored_all(raw).await;
      n += res?;
    }

    if n < total && n > size {
      let offset = n - size;
      let raw = unsafe {
        RawBuf::new(self.payload[offset..].as_ptr() as *const _, self.payload.len() - offset)
      };
      let (res, _) = stream.write_all(raw).await;
      match res {
          Ok(_) => {},
          Err(e) => return Err(e)
      }
    }
    Ok(())
  }

This code works in basic tests, but it's not extensively tested and might not hold up under thorough scrutiny. It's essential to note that using RawBufVectored and RawBuf unsafely assumes the payload's immutability during the async operation, which might not always be the case. On a per-user basis, if you are able to determine that all you will encounter in your operating is the Payload::Owned variant then you could make the monoio traits work, since the variant owns its buffer - but if not i.e. you pass this conditional

  async fn parse_frame_header<'a, S>(&mut self, stream: &mut S)  -> Result<Frame<'a>, WebSocketError> {
  ...
    let payload = if payload.len() > self.writev_threshold {
      Payload::BorrowedMut(payload)
    } else {
      Payload::Owned(payload.to_vec())
    };
  }

The threshold is there for performance purposes (?), so I'm unsure how to approach without making an assumption about the usage pattern of the end-user/ being agnostic to that process. Regardless, I think that following the Hyper route and having owned IO traits which we can implement ontop of tokio / Monoio's trait is a possible solution, which is what I'm working on now, but then we still run into the problem of compatibility between Monoio and hyper (which may be easier now with Hyper 1.0)

littledivy commented 5 months ago

@parastrom, thanks for looking into this!

I agree that owned I/O traits are probably the best way forward.

I'm open to adding new API for owned payload specific to io_uring and expose under a experimental feature flag. It won't be portable and that's probably fine, we can work on the io_uring APIs without touching epoll/kqueue code paths.