quinn-rs / quinn

Async-friendly QUIC implementation in Rust
Apache License 2.0
3.85k stars 394 forks source link

Getting consistent ConnectionReset errors when writing to a stream #1296

Closed suhrm closed 2 years ago

suhrm commented 2 years ago

Hi I am current trying to use quinn in an application where the I would like to allocate quic bidi streams to specific application streams. I have got a client server up and running using the example and docs provide. However after a certain number of transmission between the client and server I receive an Err Io(Custom { kind: ConnectionReset, error: Stopped(0) }) on subsequent writes to the stream, As far as I am aware of I have nothing that should explicitly reset the connection in my application.

Furthermore the stream that fails is id 0, I remember that this was the crypto stream a long time ago and could be the issue but I have not really followed the resent developments of QUIC.

The client/server connection handlers have the following structure

Client

pub async fn runner(&mut self) {
        let mut sstreams: HashMap<
            quinn::StreamId,
            FramedWrite<quinn::SendStream, BinCodec<Packet>>,
        > = HashMap::new();
        let mut rstreams = futures::stream::select_all(Vec::new());
        loop {
            tokio::select! {
                Some(tun_pkt) = self.send_ch.recv() => {
                    // TODO: Create new stream pair VID not taken by stream present if it is not present in sstreams
                    // Check QoS in tunnelTransmission before attempting transmission
                    let cmp_sid = quinn::VarInt::from_u32(tun_pkt.qos.vid as u32);
                    if sstreams.contains_key(&cmp_sid.into()) {
                        let tmp_s = sstreams.get_mut(&cmp_sid.into()).unwrap();
                        tmp_s.send(tun_pkt.pkt.clone()).await.unwrap();
                    }
                    else {

                        println!("Opening new stream {:?}", self.qconn.stable_id());
                        match self.qconn.open_bi().await {
                            Ok((s,r)) => {
                            let sid = s.id();
                            let ss = FramedWrite::new(s, BinCodec::<Packet>::new());
                            let rs  = FramedRead::new(r, BinCodec::<Packet>::new());
                            sstreams.insert(sid, ss);
                            rstreams.push(rs);

                            },
                            Err(err) => {println!("Error creating new stream {:?}", err);},

                        }
                    }
                },
                // TODO: Handle datagrams properly
                Some(Ok(datagram)) = self.datagrams.next() => {
                    println!("Got datagram from server: {:?}", datagram)
                },
                Some(Ok(pkt)) = rstreams.next() => {
                    self.recv_ch.send(pkt);
                }
            }
        }
    }

Server:

pub async fn runner(&mut self) {
        //Runner for incomming client connection
        let mut sstreams: HashMap<
            quinn::StreamId,
            FramedWrite<quinn::SendStream, BinCodec<Packet>>,
        > = HashMap::new();
        let mut rstreams = futures_util::stream::select_all(Vec::new());
        loop {
            tokio::select! {
                Ok(tun_pkt) = self.ch_out.recv() => {
                    let cmp_sid = quinn::VarInt::from_u32(tun_pkt.qos.vid as u32);
                    if sstreams.contains_key(&cmp_sid.into()) {
                        let tmp_s = sstreams.get_mut(&cmp_sid.into()).unwrap();
                        match tmp_s.send(tun_pkt.pkt.clone()).await {
                            Ok(_) => (),
                            Err(res) => println!("{:?}, {:?}", tmp_s.get_ref().id(), res),

                        }
                    }
                },
                Some(Ok(datagram)) = self.qconn.datagrams.next() => {
                    println!("Got datagram {:?}", datagram);
                },

                Some(Ok((s, r))) = self.qconn.bi_streams.next() => {
                    println!("new incomming stream");
                    let sid = s.id();
                    let ss = FramedWrite::new(s, BinCodec::<Packet>::new());
                    let rs  = FramedRead::new(r, BinCodec::<Packet>::new());
                    rstreams.push(rs);
                    sstreams.insert(sid, ss);
                },

                Some(Ok(pkt)) = rstreams.next() => {
                       // Handle received packet
                },
            }
        }
    }

Thanks in advance.

Regards.

Rasmus Suhr Mogensen

Ralith commented 2 years ago

QUIC errors don't map very gracefully into io::Error. For clearer diagnostics, look at the Display output for the error, or downcast to the underlying quinn error, or avoid converting into io::Error in the first place.

It looks like your real error is quinn::WriteError::Stopped, which is documented as:

The peer is no longer accepting data on this stream

Note that this does not imply anything about the state of the connection, which may well be doing perfectly fine.

Streams are stopped when the receiver calls RecvStream::stop or when the RecvStream is dropped, so you should investigate why that might be happening on your server.

suhrm commented 2 years ago

Thanks for the quick reply I will take a look at the things you have suggested :).

suhrm commented 2 years ago

Turned out the library i used for codecs in FrameWrite/FrameRead has a bug which throws an error that causes the RecvStream on the server to be dropped. After changing to LengthDelimitedCodec the drop does not occur.

Thanks again for the 'quic' response.