quinn-rs / quinn

Async-friendly QUIC implementation in Rust
Apache License 2.0
3.76k stars 380 forks source link

When sending video stream, the delay is large. #1267

Closed go-jar closed 2 years ago

go-jar commented 2 years ago

When sending video, the delay is large, which may reach more than ten seconds. Code shows as follows. Does anyone has any suggestions?

#![cfg(feature = "rustls")]

use bytes::{BufMut, Bytes, BytesMut};
use futures_util::StreamExt;
use protobuf::Message;
use quinn::{ClientConfig, Endpoint, Incoming, IncomingUniStreams, NewConnection, ServerConfig};
use rustls;
use std::{
    fs::File,
    io::{Error, BufReader},
    net::SocketAddr,
    sync::Arc,
    str
};
use sodiumoxide::crypto::secretbox::Key;
use tokio::{self, sync::mpsc};
use crate::{anyhow::anyhow, ResultType, time::Instant};
pub use quinn;

const SERVER_NAME: &str = "xremote.autox.tech";
const CHANNEL_LATENCY: i64 = 2_000;
const DROP_MSG: bool = false;

pub(crate) struct Cert<'a> {
    ca_file: &'a str,
    client_cert_file: &'a str,
    client_key_file: &'a str,
    server_cert_file: &'a str,
    server_key_file: &'a str,
}

#[cfg(not(debug_assertions))]
lazy_static::lazy_static! {
    static ref CERT: Cert<'static> = Cert {
        ca_file: "libs/hbb_common/cert/dev/ca.cert",
        client_cert_file: "libs/hbb_common/cert/dev/client.cert",
        client_key_file: "libs/hbb_common/cert/dev/client.key",
        server_cert_file: "libs/hbb_common/cert/dev/server.fullchain",
        server_key_file: "libs/hbb_common/cert/dev/server.rsa",
    };
}

#[cfg(debug_assertions)]
lazy_static::lazy_static! {
    static ref CERT: Cert<'static> = Cert {
        ca_file: "/etc/ssl/xremote/ca.cert",
        client_cert_file: "/etc/ssl/xremote/client.cert",
        client_key_file: "/etc/ssl/xremote/client.key",
        server_cert_file: "/etc/ssl/xremote/server.fullchain",
        server_key_file: "/etc/ssl/xremote/server.rsa",
    };
}

const MAX_BUFFER_SIZE: usize = 128;
type Value = Vec<u8>;
type Sender = mpsc::Sender<(Instant, Value)>;
type Receiver = mpsc::Receiver<(Instant, Value)>;

pub struct Connection {
    pub conn: quinn::Connection,
    pub endpoint: Option<Endpoint>,
    self_sender: Sender,
    out_receiver: Receiver,
}

impl Connection {
    pub async fn new_for_client(
        server_addr:  SocketAddr,
        local_addr:  SocketAddr,
        ms_timeout: u64
    ) -> ResultType<Self> {
        let client_cfg = client::config(CERT.ca_file, CERT.client_cert_file, CERT.client_key_file);
        let mut endpoint = Endpoint::client(local_addr).expect("create client endpoint");
        endpoint.set_default_client_config(client_cfg);

        let connecting = super::timeout(
            ms_timeout,
            endpoint.connect(server_addr, SERVER_NAME).expect("connect to server error")
        ).await??;

        let NewConnection {
            connection,
            uni_streams,
            ..
        } = connecting;

        let (self_sender, self_receiver) = mpsc::channel::<(Instant, Value)>(MAX_BUFFER_SIZE);
        let (out_sender, out_receiver) = mpsc::channel::<(Instant, Value)>(MAX_BUFFER_SIZE);
        tokio::spawn(process_stream(
            connection.clone(),
            uni_streams,
            out_sender,
            self_receiver,
            connection.remote_address()
        ));

        Ok(Connection {
            conn: connection,
            endpoint: Some(endpoint),
            self_sender,
            out_receiver,
        })
    }

    pub async fn new_for_server(conn: quinn::Connecting) -> Self {
        let quinn::NewConnection {
            connection,
            uni_streams,
            ..
        } = conn.await.expect("server create connection error");

        let (self_sender, self_receiver) = mpsc::channel::<(Instant, Value)>(MAX_BUFFER_SIZE);
        let (out_sender, out_receiver) = mpsc::channel::<(Instant, Value)>(MAX_BUFFER_SIZE);
        tokio::spawn(process_stream(
            connection.clone(),
            uni_streams,
            out_sender,
            self_receiver,
            connection.remote_address()
        ));

        Connection {
            conn: connection,
            endpoint: None,
            self_sender,
            out_receiver,
        }
    }

    #[inline]
    pub async fn next(&mut self) -> Option<Result<BytesMut, Error>> {
        match self.out_receiver.recv().await {
            None => None,
            Some((_, req_bytes)) => {
                let mut bytes = BytesMut::new();
                bytes.put_slice(&req_bytes);
                return Some(Ok(bytes));
            }
        }
    }

    #[inline]
    pub async fn next_timeout(&mut self, ms: u64) -> Option<Result<BytesMut, Error>> {
        if let Ok(res) =
            tokio::time::timeout(std::time::Duration::from_millis(ms), self.next()).await {
            res
        } else {
            None
        }
    }

    #[inline]
    pub async fn send(&mut self, msg: &impl Message) -> ResultType<()> {
        self.send_raw(msg.write_to_bytes()?).await
    }

    #[inline]
    pub async fn send_raw(&mut self, msg: Vec<u8>) -> ResultType<()> {
        self.self_sender.send((Instant::now(), msg))
            .await
            .map_err(|e| anyhow!("failed to shutdown stream: {}", e))
    }

    pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> {
        self.send_raw(bytes.to_vec()).await?;
        Ok(())
    }

    #[inline]
    pub fn remote_address(&self) -> SocketAddr {
        self.conn.remote_address()
    }

    #[inline]
    pub async fn shutdown(&self) -> std::io::Result<()> {
        self.conn.close(0u32.into(), b"done");
        // Give the peer a fair chance to receive the close packet
        if let Some(endpoint) = &self.endpoint {
            endpoint.wait_idle().await;
        }
        Ok(())
    }

    pub fn set_raw(&mut self) {
    }

    pub fn set_key(&mut self, _key: Key) {
    }
}

async fn send(instant: Instant, msg: Vec<u8>, conn: quinn::Connection) {
    let latency = instant.elapsed().as_millis() as i64;
    if DROP_MSG && latency as i64 > CHANNEL_LATENCY {
        log::debug!("The duration of the message in the quic sending queue is: {:?}", latency);
        return;
    }

    if let Ok(mut sender_stream) = conn.open_uni().await {
        match sender_stream.write_all(&msg).await {
            Err(e) =>  log::error!("send msg error: {:?}", e),
            _ => {}
        }
        match sender_stream.finish().await {
            Err(e) => log::error!("finish send stream error: {:?}", e),
            _ => {}
        }
    }
}

async fn process_stream(
    conn: quinn::Connection,
    mut uni_streams: IncomingUniStreams,
    out_sender: Sender,
    mut self_receiver: Receiver,
    addr: SocketAddr
) {
    let a = async move {
        loop {
            match self_receiver.recv().await {
                Some((instant, msg)) => {
                    let conn_cloned = conn.clone();
                    tokio::task::spawn(async move {
                        send(instant, msg, conn_cloned).await;
                    });
                }
                None => break,
            }
        }
        log::info!("exit send loop");
        Err::<(), ()>(())
    };

    let b  = async move {
        loop {
            match uni_streams.next().await {
                Some(result) => {
                    match result {
                        Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
                            log::error!("connection terminated by peer {:?}.", &addr);
                            break;
                        }
                        Err(err) => {
                            log::error!("read msg for peer {:?} with error: {:?}", &addr, err);
                            break;
                        }
                        Ok(recv_stream) => {
                            if let Ok(bytes) = recv_stream.read_to_end(usize::max_value()).await {
                                match out_sender.send((Instant::now(), bytes)).await {
                                    Err(_e) => {
                                        log::error!("connection closed");
                                        break;
                                    }
                                    _ => {}
                                }
                            }
                        }
                    }
                }
                None => break,
            }
        }
        log::info!("exit recv loop");
        Err::<(), ()>(())
    };

    let _ = tokio::join!(a, b);
    log::info!("close stream: {}", addr);
}

pub mod server {
    use super::*;

    pub fn new_endpoint(bind_addr: SocketAddr) -> ResultType<(Endpoint, Incoming)> {
        let server_config = config(CERT.server_cert_file, CERT.server_key_file).expect("config quic server error");
        let (endpoint, incoming) = Endpoint::server(server_config, bind_addr)?;
        Ok((endpoint, incoming))
    }

    fn config(certs: &str, key_file: &str) -> Result<ServerConfig, Box<dyn std::error::Error>> {
        let roots = load_certs(certs);
        let certs = roots.clone();
        let mut client_auth_roots = rustls::RootCertStore::empty();
        for root in roots {
            client_auth_roots.add(&root).unwrap();
        }
        let client_auth = rustls::server::AllowAnyAuthenticatedClient::new(client_auth_roots);

        let privkey = load_private_key(key_file);
        let suites = rustls::ALL_CIPHER_SUITES.to_vec();
        let versions = rustls::ALL_VERSIONS.to_vec();

        let server_crypto = rustls::ServerConfig::builder()
            .with_cipher_suites(&suites)
            .with_safe_default_kx_groups()
            .with_protocol_versions(&versions)
            .expect("inconsistent cipher-suites/versions specified")
            .with_client_cert_verifier(client_auth)
            .with_single_cert_with_ocsp_and_sct(certs, privkey, vec![], vec![])
            .expect("bad certificates/private key");

        Ok(quinn::ServerConfig::with_crypto(Arc::new(server_crypto)))
    }
}

pub mod client {
    use super::*;

    pub(crate) fn config(ca_file: &str, certs_file: &str, key_file: &str) -> ClientConfig {
        let cert_file = File::open(&ca_file).expect(&format!("Cannot open CA file: {:?}", ca_file));
        let mut reader = BufReader::new(cert_file);

        let mut root_store = rustls::RootCertStore::empty();
        root_store.add_parsable_certificates(&rustls_pemfile::certs(&mut reader).unwrap());

        let suites = rustls::DEFAULT_CIPHER_SUITES.to_vec();
        let versions = rustls::DEFAULT_VERSIONS.to_vec();

        let certs = load_certs(certs_file);
        let key = load_private_key(key_file);

        let crypto = rustls::ClientConfig::builder()
            .with_cipher_suites(&suites)
            .with_safe_default_kx_groups()
            .with_protocol_versions(&versions)
            .expect("inconsistent cipher-suite/versions selected")
            .with_root_certificates(root_store)
            .with_single_cert(certs, key)
            .expect("invalid client auth certs/key");

        ClientConfig::new(Arc::new(crypto))
    }
}

pub fn load_certs(filename: &str) -> Vec<rustls::Certificate> {
    let certfile = File::open(filename).expect(&format!("cannot open certificate file: {:?}", filename));
    let mut reader = BufReader::new(certfile);
    rustls_pemfile::certs(&mut reader)
        .unwrap()
        .iter()
        .map(|v| rustls::Certificate(v.clone()))
        .collect()
}

pub fn load_private_key(filename: &str) -> rustls::PrivateKey {
    let keyfile = File::open(filename).expect("cannot open private key file");
    let mut reader = BufReader::new(keyfile);

    loop {
        match rustls_pemfile::read_one(&mut reader).expect("cannot parse private key .pem file") {
            Some(rustls_pemfile::Item::RSAKey(key)) => return rustls::PrivateKey(key),
            Some(rustls_pemfile::Item::PKCS8Key(key)) => return rustls::PrivateKey(key),
            None => break,
            _ => {}
        }
    }

    panic!(
        "no keys found in {:?} (encrypted keys not supported)",
        filename
    );
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::{error::Error, net::SocketAddr};
    use futures_util::StreamExt;

    lazy_static::lazy_static! {
        static ref CERT: Cert<'static> = Cert {
            ca_file: "cert/dev/ca.cert",
            client_cert_file: "cert/dev/client.cert",
            client_key_file: "cert/dev/client.key",
            server_cert_file: "cert/dev/server.fullchain",
            server_key_file: "cert/dev/server.rsa",
        };
    }

    #[tokio::test]
    async fn quic() -> Result<(), Box<dyn Error>> {
        let addr = "127.0.0.1:5000".parse().unwrap();
        tokio::spawn(run_server(addr));
        run_client(addr).await;
        Ok(())
    }

    async fn run_server(addr: SocketAddr) {
        let (_endpoint, mut incoming) = server::new_endpoint(addr).unwrap();
        while let Some(conn) = incoming.next().await {
            tokio::spawn(handle_connection(conn));
        }
    }

    async fn handle_connection(conn: quinn::Connecting) {
        let mut conn = Connection::new_for_server(conn).await;
        println!("[server] client address: {:?}", conn.remote_address());

        if let Some(recv_bytes) = conn.next().await {
            println!("[server] [2] recive: {:?}", recv_bytes);

            println!("[server] [3] send: hello client 1");
            conn.send_raw(b"hello client 1".to_vec())
                .await
                .unwrap_or_else(move |e| println!("failed: {reason}", reason = e.to_string()));
        }

        println!("[server] [5] send: hello client 2");
        conn.send_raw(b"hello client 2".to_vec()).await.unwrap();
        if let Some(resp_bytes) = conn.next().await {
            println!("[server] [8] receive: {:?}", resp_bytes);
        }
    }

    async fn run_client(server_addr: SocketAddr) {
        let local_addr = "127.0.0.1:8888".parse().unwrap();
        let mut conn = Connection::new_for_client(server_addr, local_addr, 1000).await.unwrap();

        println!("[client] [1] send: hello server 1");
        let mut buf = BytesMut::with_capacity(64);
        buf.put(&b"hello server 1"[..]);
        conn.send_raw(b"hello server 1".to_vec()).await.unwrap();
        let resp_bytes = conn.next().await.unwrap();
        println!("[client] [4] receive: {:?}", resp_bytes);

        if let Some(recv_bytes) = conn.next().await {
            println!("[client] [6] recive: {:?}", recv_bytes);

            println!("[client] [7] send: hello server 2");
            conn.send_raw(b"hello server 2".to_vec())
                .await
                .unwrap_or_else(move |e| println!("failed: {reason}", reason = e.to_string()));
        }

        conn.shutdown().await.unwrap();
    }
}
Ralith commented 2 years ago

What data rate are you observing? What data rate did you expect? Where are your transmit and receive tasks waiting?

It looks like you still have a finish().await call, which you may want to adjust per previous discussion in #1265.

go-jar commented 2 years ago

I moved finish().await into a task. Maybe I should remove finish().await and don't need to use task. Now, I use multiple streams to send videos, should I sorted on the receiving end?

go-jar commented 2 years ago

I moved finish().await into a task. Maybe I should remove finish().await and don't need to use tasks. Now, I use multiple streams to send videos, should I sorted on the receiving end?

Ralith commented 2 years ago

A task that does nothing but finish().await indeed serves no purpose.

Refer to the IncomingUniStreams documentation for information on inter-stream ordering.

go-jar commented 2 years ago

A task that does nothing but finish().await indeed serves no purpose.

Refer to the IncomingUniStreams documentation for information on inter-stream ordering.

Thanks!

go-jar commented 2 years ago

I retested under the LAN. I find that uni_streams.next().await takes 1~120ms. RTT is about 2ms.

Ralith commented 2 years ago

If you're stuck waiting on uni_streams.next(), that indicates that the sender hasn't transmitted a new stream yet. What's the sender waiting for?

go-jar commented 2 years ago

If you're stuck waiting on uni_streams.next(), that indicates that the sender hasn't transmitted a new stream yet. What's the sender waiting for?

I see. Thank you.