quinn-rs / quinn

Async-friendly QUIC implementation in Rust
Apache License 2.0
3.76k stars 380 forks source link

How to send video stream? #1260

Closed go-jar closed 2 years ago

go-jar commented 2 years ago

I want to send vedio stream using quinn. But every time I send a video stream by calling open_uni(), it is always failed. I think it because the encryption is too time-consuming. So I move open_uni() to a thread like follows.

const MAX_BUFFER_SIZE: usize = 128;
type Value = Vec<u8>;
type Sender = mpsc::Sender<Value>;
type Receiver = mpsc::Receiver<Value>;

async fn process_stream(
    conn: quinn::Connection,
    mut uni_streams: IncomingUniStreams,
    out_sender: Sender,
    mut self_receiver: Receiver,
    addr: SocketAddr
) {
    let a = async move {
        loop {
            match self_receiver.recv().await {
                Some(msg) => {
                    if let Ok(mut sender_stream) = conn.open_uni().await {
                        match sender_stream.write_all(&msg).await {
                            Err(e) =>  log::error!("send msg error: {:?}", e),
                            _ => {}
                        }
                        match sender_stream.finish().await {
                            Err(e) => log::error!("finish send stream error: {:?}", e),
                            _ => {}
                        }
                    }
                }
                None => break,
            }
        }
        Err::<(), ()>(())
    };

    let b  = async move {
        loop {
            match uni_streams.next().await {
                Some(result) => {
                    match result {
                        Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
                            log::error!("connection terminated by peer {:?}.", &addr);
                            break;
                        }
                        Err(err) => {
                            log::error!("read msg for peer {:?} with error: {:?}", &addr, err);
                            break;
                        }
                        Ok(recv_stream) => {
                            if let Ok(bytes) = recv_stream.read_to_end(usize::max_value()).await {
                                out_sender.send(bytes).await.unwrap();
                            }
                        }
                    }
                }
                None => break,
            }
        }
        Err::<(), ()>(())
    };

    let _ = tokio::join!(a, b);
    log::info!("close stream: {}", addr);
}

And it works. Is there a better way?

djc commented 2 years ago

How did it fail? You cannot write the whole video stream (assuming it is decently sized) without interleaving with the peer reading from that stream, since streams can contain limited buffered data before raising an error. The solution of using separate tasks for the reader and the writer seems like a fine solution for this.

go-jar commented 2 years ago

How did it fail? You cannot write the whole video stream (assuming it is decently sized) without interleaving with the peer reading from that stream, since streams can contain limited buffered data before raising an error. The solution of using separate tasks for the reader and the writer seems like a fine solution for this.

Thanks!