webrtc-rs / webrtc

A pure Rust implementation of WebRTC
https://webrtc.rs
Apache License 2.0
4.14k stars 367 forks source link

Sans-IO protocol implementations #230

Open algesten opened 2 years ago

algesten commented 2 years ago

webrtc-rs is currently almost a line-by-line port of Pion. Pion is written in Go, a garbage collected language, and this causes some awkwardness for webrtc-rs both internally and externally (API-wise). The WebRTC spec is (consciously, or unconsciously), quite javascript oriented (another GC language). It relies heavily on callbacks and largely ignores data ownership (or at least isn't near Rust's strict ownership rules). Furthermore Pion is largely async, and webrtc-rs has "inherited" this and use various methods to get as near the original Go code as possible.

This should not be seen as critique of webrtc-rs, but rather be seen as background to these points of improvement. To get a more "Rusty" webrtc-rs, we could consider a "Sans-IO" protocol implementation.

The advantages of Sans-IO implementations can be read in above linked post. Another point of inspiration is quinn_proto, which is a synchronous underpinning of quinn, which is async.

Similarly we could structure webrtc-rs to be Sans-IO/sync at the lower levels, and introduce async in the top level user facing API. This might also lead us to other API choices than the javascript RTCPeerConnection (which is somewhat awkward in Rust for above stated reasons).

Discord discussions here:

melekes commented 2 years ago

@rainliu: "as mentioned in https://github.com/webrtc-rs/webrtc/issues/136, SCTP has been actively refactored in branch https://github.com/webrtc-rs/sctp/tree/proto.

The refactoring follows the quinn's implementation: https://github.com/quinn-rs/quinn. sctp-proto: Deterministic state machine of the protocol which performs no I/O internally. sctp-async: High-level async API based on tokio.

The refactoring philosophy is Sans-I/O Protocol Implementation: https://sans-io.readthedocs.io/how-to-sans-io.html

SCTP is just the first step of refactoring current webrtc-rs stack, followed by DTLS, ICE, etc."

thomaseizinger commented 2 years ago

Now that there has been a move towards a mono-repo, I think changing these crates should be a lot easier because one can immediately see the impact on most consumers.

SCTP seems to be a leaf-crate in regards to the dependency tree. Has anyone already attempted to port the above branch onto what is now in master?

cameronelliott commented 2 years ago

Been away for a month. Was going to suggest mono repo is good idea for this issue. Just found out it was done 10 days ago. Cool!

k0nserv commented 2 years ago

I think @rainliu is best positioned to answer about the state of the sans-io SCTP implementation

kraigher commented 1 year ago

I agree that the PeerConnection API with its on_xyz callbacks is very inconvenient to use in Rust and forces a lot of Arc/Mutex usage. It would be nicer to have a channel of events of some kind or something like https://docs.rs/tokio/latest/tokio/sync/watch/index.html or https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html#

I ended up making my own event channel wrapper which I share here if it is useful for inspiration. The event channel allows handing peer connection events and websocket trickle ice signalling events together in the same tokio::select loop without any mutex on the websocket. I do not know if it in the end is better to have all events in one channel or split it up into multiple ones but for my small use case a single channel worked well.

#[derive(Debug)]
pub enum RTCEvent {
    IceConnectionState(RTCIceConnectionState),
    IceCandidate(Option<RTCIceCandidate>),
    PeerConnectionState(RTCPeerConnectionState),
    NewTrack(Arc<TrackRemote>),
}

impl RTCEvent {
    /// Allow convenient API for RTCPeerConnection where we can consume messages instead of
    /// Registering callbacks
    pub async fn channel(conn: &RTCPeerConnection) -> UnboundedReceiver<RTCEvent> {
        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
        let tx = Arc::new(tx);

        let tx2 = Arc::clone(&tx);
        conn.on_ice_connection_state_change(Box::new(move |s: RTCIceConnectionState| {
            let _ = tx2.send(RTCEvent::IceConnectionState(s));
            Box::pin(async {})
        }));

        let tx2 = Arc::clone(&tx);
        conn.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
            let _ = tx2.send(RTCEvent::PeerConnectionState(s));
            Box::pin(async {})
        }));
        let tx2 = Arc::clone(&tx);
        conn.on_ice_candidate(Box::new(move |s: Option<RTCIceCandidate>| {
            let _ = tx2.send(RTCEvent::IceCandidate(s));
            Box::pin(async {})
        }));

        let tx2 = Arc::clone(&tx);
        conn.on_track(Box::new(
            move |remote: Option<Arc<TrackRemote>>, _rtprecv: Option<Arc<RTCRtpReceiver>>| {
                if let Some(track) = remote {
                    let _ = tx2.send(RTCEvent::NewTrack(track));
                }
                Box::pin(async {})
            },
        ));

        rx
    }
}

pub async fn establish_connection(
    peer_conn: &RTCPeerConnection,
    ws: &mut dyn WebSocket,
    rtc_chan: &mut UnboundedReceiver<RTCEvent>,
) -> anyhow::Result<()> {
    loop {
        tokio::select! {
            msg = ws.next() => {
                match msg {
                    Some(Ok(Message::Text(msg))) => {
                        let candidate = serde_json::from_str::<Option<RTCIceCandidateInit>>(&msg)?;
                        if let Some(candidate) = candidate {
                            log::debug!("Got remote candidate: {:?}", candidate);
                            peer_conn.add_ice_candidate(candidate).await?;
                        } else {
                            log::info!("Got remote end of candidates");
                            peer_conn.add_ice_candidate(RTCIceCandidateInit::default()).await?;
                        }
                    },
                    Some(Err(e)) => {
                        return Err(e.into());
                    }
                    None => {
                        return Err(anyhow!("Websocket closed unexpectedly"));
                    },
                    _ => {
                        // Ignore other ws messages
                    }
                };
            },

            msg = rtc_chan.recv() => {
                match msg {
                    Some(event) => {
                        match event {
                            RTCEvent::IceConnectionState(s) => {
                                log::info!("ICE Connection State has changed {}", s);
                                if s == RTCIceConnectionState::Failed {
                                    return Err(anyhow!("ICE handshake failed"));
                                }
                            }
                            RTCEvent::IceCandidate(candidate) => {
                                if let Some(candidate) = candidate  {
                                    let candidate = candidate.to_json()?;
                                    let msg = serde_json::to_string(&candidate)?;
                                    ws.send(Message::Text(msg)).await?;
                                    log::debug!("Sent local candidate {:?}", candidate);
                                } else {
                                    let msg = serde_json::to_string(&serde_json::Value::Null)?;
                                    ws.send(Message::Text(msg)).await?;
                                    log::info!("Sent local end of candidates");
                                }
                            },
                            RTCEvent::PeerConnectionState(s) => {
                                log::info!("Peer Connection State has changed: {}", s);

                                match s {
                                    RTCPeerConnectionState::Failed
                                    | RTCPeerConnectionState::Disconnected
                                    | RTCPeerConnectionState::Closed => {
                                        return Err(anyhow!("Failed to establish connection"));
                                    }
                                    RTCPeerConnectionState::Connected => {
                                        return Ok(());
                                    },
                                    _ => {}
                                };
                            },
                            _ => {

                            }
                        }
                    },
                    None => {
                        return Err(anyhow!("RTC channel returned None"));
                    }
                }
            }
        }
    }
}
thomaseizinger commented 1 year ago

Whilst that may end up being better for now, non-async state machines would compose even better than that and would likely be more performant and more idiomatic Rust!

Channels are good on the application layer. The issue is that once introduced, any layer on top is forced into them, even if they don't want the channels, thus it is better to not have them in the public API of a fundamental library like this one.

algesten commented 1 year ago

Here's a starting point for a Sans I/O implementation. https://github.com/algesten/str0m

thomaseizinger commented 1 year ago

Here's a starting point for a Sans I/O implementation. https://github.com/algesten/str0m

Amazing! Thank you for starting this! Let me know if you need financial support for pushing this forward.

I can't promise anything but I heard from multiple directions that a more flexible and idiomatic WebRTC implementation in Rust would be greatly appreciated.