cloudflare / quiche

🥧 Savoury implementation of the QUIC transport protocol and HTTP/3
https://docs.quic.tech/quiche/
BSD 2-Clause "Simplified" License
9.4k stars 709 forks source link

Question about using streams #1077

Open Apolexian opened 2 years ago

Apolexian commented 2 years ago

Hi,

I want have a wrapper around a quiche connection such as:

pub struct QuicListener {
    socket: mio::net::UdpSocket,
    pub connection: Option<Pin<Box<quiche::Connection>>>,
}

And two methods: accept and connect which essentially use the code from the quiche examples in order to get the connection to the point where the handshake is established.

After this I want to use streams to send/receive data on the connection.

I'm having trouble as (in my understanding) the stream_send function initialises the stream on which the data is sent and then stream_recv reads on this initialised stream.

What seems to be happening in my situation is that the stream is initialised for the connection object on the client side but the server does not get this stream initialised, so goes into invalid stream state.

Is there any example/repo where something similar is done?

For reference:

The server code

The client code

The wrapper

Thanks for any help!

LPardue commented 2 years ago

Hi @Apolexian

Are you actually continuing to call quiche conn send/receive, and socket conn send/recv after the connection is established?

All stream_send does is queue data up inside the quiche connection, you'll need the application to continue managing I/O and driving the quiche connection.

Apolexian commented 2 years ago

Hi @LPardue ,

Thanks for your reply!

I've tried to use the event loop in the same way as in the example in my send and receive functions but I'm running into trouble with (I think) the eventloop on the receive side not having time to poll before the send is made, so the receive function hangs waiting (If I close then re-run my client and prompt the eventloop then the server receives correctly).

I'm calling these functions after the initial handshake is established (connection.is_established() returns true) and the mio::Poll is in the struct.

Apologies if I'm missing something obvious.

The functions for reference:

Send

pub fn stream_send(&mut self, stream_id: u64, payload: &mut [u8]) -> io::Result<()> {
        let mut out = [0; DEFAULT_MAX_DATAGRAM_SIZE];
        let mut buf = [0; 65535];
        let mut conn = self.connection.take().unwrap();
        let events = mio::Events::with_capacity(1024);
        conn.stream_send(stream_id, payload, true).unwrap();
        loop {
            'read: loop {
                if events.is_empty() {
                    break 'read;
                }
                let (len, from) = match self.socket.recv_from(&mut buf) {
                    Ok(v) => v,
                    Err(e) => {
                        if e.kind() == std::io::ErrorKind::WouldBlock {
                            break 'read;
                        }
                        panic!("recv() failed: {:?}", e);
                    }
                };
                let packet = &mut buf[..len];

                // Process potentially coalesced packets
                let recv_info = quiche::RecvInfo { from };
                let _ = match conn.recv(packet, recv_info) {
                    Ok(v) => v,
                    Err(_) => {
                        continue 'read;
                    }
                };
                while let Ok((_, fin)) = conn.stream_recv(stream_id, &mut out) {
                    if fin {
                        break 'read;
                    }
                }
            }
            loop {
                let (write, send_info) = match conn.send(&mut out) {
                    Ok(v) => v,
                    Err(quiche::Error::Done) => {
                        self.connection = Some(conn);
                        return Ok(());
                    }
                    Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e)),
                };
                if let Err(e) = self.socket.send_to(&mut out[..write], &send_info.to) {
                    if e.kind() == std::io::ErrorKind::WouldBlock {
                        self.connection = Some(conn);
                        return Ok(());
                    }
                    panic!("send() failed: {:?}", e);
                }
            }
        }
    }

Receive

 pub fn stream_recv(&mut self, stream_id: u64, out: &mut [u8]) -> io::Result<usize> {
        let mut buf = [0; 65535];
        // set up event loop
        let mut events = mio::Events::with_capacity(1024);
        let mut len_stream = None;
        loop {
            let mut conn = self.connection.take().unwrap();
            let mut done = None;
            self.poll.poll(&mut events, None).unwrap();
            'read: loop {
                if events.is_empty() {
                    self.connection = Some(conn);
                    break 'read;
                }
                let (len, from) = match self.socket.recv_from(&mut buf) {
                    Ok(v) => v,
                    Err(e) => {
                        if e.kind() == std::io::ErrorKind::WouldBlock {
                            self.connection = Some(conn);
                            break 'read;
                        }
                        panic!("recv() failed: {:?}", e);
                    }
                };
                let packet = &mut buf[..len];

                // Process potentially coalesced packets
                let recv_info = quiche::RecvInfo { from };
                let _ = match conn.recv(packet, recv_info) {
                    Ok(v) => v,
                    Err(_) => {
                        continue 'read;
                    }
                };
                while let Ok((read, fin)) = conn.stream_recv(stream_id, out) {
                    if fin {
                        len_stream = Some(read);
                        done = Some(fin);
                        self.connection = Some(conn);
                        break 'read;
                    }
                }
            }
            let mut conn = self.connection.take().unwrap();
            loop {
                let (write, send_info) = match conn.send(out) {
                    Ok(v) => v,
                    Err(quiche::Error::Done) => {
                        self.connection = Some(conn);
                        break;
                    }
                    Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e)),
                };
                if let Err(e) = self.socket.send_to(&mut out[..write], &send_info.to) {
                    if e.kind() == std::io::ErrorKind::WouldBlock {
                        self.connection = Some(conn);
                        break;
                    }
                    panic!("send() failed: {:?}", e);
                }
            }
            if done.unwrap() == true {
                return Ok(len_stream.unwrap());
            }
        }
    }