webrtc-rs / webrtc

A pure Rust implementation of WebRTC
https://webrtc.rs
Apache License 2.0
4.18k stars 371 forks source link

[Bug?] Unable to display video in browser after relaying h264 video from one peer connection to another #443

Closed Sollimann closed 1 year ago

Sollimann commented 1 year ago

SHORT VERSION!. I have a similiar implementation in Go using Pion that works. However, it doesn't work in Rust using the webrtc crate. What am I doing wrong?

Context: Hey, Rust/video enthusiasts! I'm currently porting our robots' WebRTC video streaming from Go to Rust to improve the maintainability of our codebase. Essentially, we want to avoid maintaining too many languages at once, so we're trying to stick to Rust as it makes up the majority of our codebase.

Current Flow (See system drawing below):

image Note: media only flows from left to right, i.e from the robots to the frontend UI

image Title: Spot robot from Boston Dynamics

image Title: InRobot UI / Frontend app

The goal of our video streaming setup is to relay the WebRTC video stream from the cameras on Spot (image above) through a software component that we're running on the Spot edge computer called the 'webrtc-controller'. The webrtc-controller sets up a WebRTC peer connection with a third-party video infrastructure provider called Twilio, and basically forwards the RTP packets from Spot to the webrtc-controller, then from the webrtc-controller to Twilio. We can then connect to Twilio from the browser and consume the video stream from there.

Problem + Questions: The problem now is that I have kind of hit a wall right now with our migration to Rust and could use some help in resolving the issue.

The current issue is that while we're able to successfully do signaling with Spot and signaling with Twilio, and we're successfully able to connect the two peers and start sending packets to Twilio, we are not able to see the video stream in the browser.

  1. Has anyone encountered similar issues with this crate?
  2. Are you able to see any immediate errors/flaws in the code? (the code is a bit disorganized now because I've been testing a lot of stuff)
  3. Any good experiments I can run - other than the ones listed below - that could help identify the core issues? It would be good to have a fresh pair of eyes to take a look at the code and setup and also bounce some ideas off.

What I've tried:

  1. I'm currently able to stream video from a file that uses the H.264 codec to Twilio and display it in the browser. That tells me that Twilio, the Rust WebRTC library, and the Chrome browser support consuming video encoded with H.264.
  2. I have also set up an example where I allow the browser to access my webcam, and then stream the webcam video encoded with the VP8 video codec back to me using VP8 over a WebRTC peer connection. I then forward/relay that video to Twilio and consume it in the frontend. That tells me two things: first, our method of relaying RTP packets between two connections works. Second, we can relay video encoded in VP8.
  3. I have also set up an example where I create a peer connection with Spot and write all the RTP packets for audio and video to UDP ports 4002 and 4004. I am then able to successfully display the audio and video in the VLC player by consuming from those ports. That tells me that I'm successfully able to do signaling and consume RTP packets encoded in H.264 from Spot.

Hypothesis: My hypothesis is that there is somehow a mismatch between the codecs being used. While we're receiving RTP packets encoded with H.264 from the Spot robot, there is somehow a mismatch between what is received by Spot and what the peer connection with Twilio is expected to receive. I have printed out the codecs of each track and compared the configuration and codecs, and they both seem to match.

About Spot:

Spot uses a http 1.1 server for signaling and we’re interacting with the server on the local network on Spot Spot supports H264 video codec and OPUS audio codec. See Spot webrtc docs

About Twilio:

PS! I’m including some links to our code base in the comment section if you want to have a look for yourself Thank you!!

structure

└── src
    ├── cloud
    │   ├── boot.rs
    │   ├── client.rs
    │   └── mod.rs
    ├── error.rs
    ├── lib.rs
    ├── main.rs
    ├── relay.rs
    └── spot
        ├── cert.rs
        ├── client.rs
        ├── mod.rs
        ├── types.rs
        ├── udp.rs
        ├── utils.rs
        └── vlc_debugging.sdp

main.rs

use anyhow::Result;
use rust_webrtc_controller::cloud::client::CogniteVideoClient;
use rust_webrtc_controller::spot::types::{SpotClient, SpotConfig, SpotCredentials};

#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();

    let spot_user = "<user-here>";
    let spot_password = "<pass-here>";
    let spot_host = "<ip-here>";

    let mut spot_client = SpotClient::new(SpotConfig {
        host: spot_host.into(),
        port: 31102,
        credentials: SpotCredentials {
            username: spot_user.into(),
            password: spot_password.into(),
        },
    })
    .await;

    // the media track we're writing to, that we can send to Twilio
    let spot_remote_tracks = spot_client.connect().await?;

    // PS! If the connection between webrtc-controller and twilio/signaling server it's down, it shouldn't
    // be needed to recreate the local tracks as long as the connection with Spot is not broken
    CogniteVideoClient::start_video_stream_forwarding(spot_remote_tracks).await?;
    Ok(())
}

types.rs

use std::sync::Arc;

use tokio::sync::RwLock;
use webrtc::track::{track_local::track_local_static_rtp::TrackLocalStaticRTP, track_remote::TrackRemote};

pub(crate) type RemoteTrack = Arc<webrtc::track::track_remote::TrackRemote>;
pub(crate) type RemoteTracks = Arc<RwLock<Vec<RemoteTrack>>>;

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct SpotConfig {
    pub host: String,
    pub port: u16,
    pub credentials: SpotCredentials,
}
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct SpotCredentials {
    pub username: String,
    pub password: String,
}

pub struct SpotWebrtcProducer {
    pub spot_config: SpotConfig,
    pub api: webrtc::api::API,
}

#[derive(Clone)]
pub struct SpotClient {
    pub spot_config: SpotConfig,
    pub(crate) token: Option<String>,
    pub client: reqwest::Client,
}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub(crate) struct SpotCamOffer {
    pub(crate) id: i32,
    pub(crate) command: String,
    pub(crate) sdp: String,
}

#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
pub(crate) struct AuthResponse {
    pub(crate) token: String,
}

#[derive(Debug, Clone)]
pub(crate) struct TrackPair {
    pub(crate) source_track: Arc<TrackRemote>,
    pub(crate) sink_track: Arc<TrackLocalStaticRTP>,
}

cloud/client.rs

use cognite_twilio_client::{
    grpc::client::robot_video_client,
    signaling::helpers::{create_heartbeat_request, create_video_tracks_request},
};
use protos::rust::robotics::{
    self, robot_interface_client::RobotInterfaceClient, IceServers, LocalMediaTrack, VideoRequest, VideoResponse,
};
use tokio::sync::Notify;
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use webrtc::{
    api::{interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder},
    ice_transport::{ice_connection_state::RTCIceConnectionState, ice_server::RTCIceServer},
    interceptor::registry::Registry,
    peer_connection::{
        configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState,
        sdp::session_description::RTCSessionDescription, RTCPeerConnection,
    },
    rtp_transceiver::rtp_codec::RTPCodecType,
    track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocal},
};

use std::{cell::RefCell, rc::Rc, sync::Arc};

use crate::{
    cloud::boot::gen_new_token,
    error::VideoError,
    relay::relay_track,
    spot::types::{RemoteTracks, TrackPair},
};

pub struct CogniteVideoClient {}

impl CogniteVideoClient {
    async fn remote_tracks_to_media_engine_object(
        tracks: Vec<&Arc<webrtc::track::track_remote::TrackRemote>>,
    ) -> Result<webrtc::api::API, webrtc::Error> {
        let mut m = MediaEngine::default();
        for remote_track in tracks.iter() {
            if remote_track.kind() == RTPCodecType::Video {
                for codec in remote_track.params().await.codecs {
                    m.register_codec(codec, RTPCodecType::Video).ok();
                }
            } else {
                for codec in remote_track.params().await.codecs {
                    m.register_codec(codec, RTPCodecType::Audio).ok();
                }
            }
        }

        let mut registry = Registry::default();

        // Use the default set of Interceptors
        registry = register_default_interceptors(registry, &mut m)?;

        // Create the API object with the MediaEngine
        let api = APIBuilder::new()
            .with_media_engine(m)
            .with_interceptor_registry(registry)
            .build();

        Ok(api)
    }

    async fn create_peer_connection(
        tracks: Vec<&Arc<webrtc::track::track_remote::TrackRemote>>,
        ice_servers: &IceServers,
        notify_tx: Arc<Notify>,
    ) -> Result<Arc<RTCPeerConnection>, webrtc::Error> {
        // Prepare the configuration
        let ice_servers: Vec<RTCIceServer> = ice_servers
            .servers
            .iter()
            .map(|ice_server| RTCIceServer {
                urls: ice_server.urls.to_owned(),
                username: ice_server.username.to_owned(),
                credential: ice_server.credential.to_owned(),
                ..Default::default()
            })
            .collect();

        let config = RTCConfiguration {
            ice_servers,
            ..Default::default()
        };

        // PS!!! default webrtc api settings might be insufficent for forwarding spot webrtc stream
        let webrtc_api = Self::remote_tracks_to_media_engine_object(tracks).await.unwrap();

        // Create a new RTCPeerConnection
        let peer_connection = Arc::new(webrtc_api.new_peer_connection(config).await?);

        // Create event handlers:
        // sets up an empty callback function for when the ICE connection state changes.
        peer_connection.on_ice_connection_state_change(Box::new(move |ics| {
            match ics {
                RTCIceConnectionState::Connected => {
                    log::info!("Connected to Twilio ICE")
                }
                RTCIceConnectionState::Disconnected => {
                    log::warn!("Disconnected from Twilio ICE")
                }
                RTCIceConnectionState::Failed => {
                    log::error!("Twilio ICE failed")
                }
                RTCIceConnectionState::Completed => {
                    log::info!("Twilio ICE completed")
                }
                RTCIceConnectionState::Closed => {
                    log::warn!("Twilio ICE closed")
                }
                _ => {}
            }
            // // needed because it wraps an empty async block into a boxed, pinned future that fulfills the type requirements of the event handler
            Box::pin(async {}) // no-op
        }));

        // sets up a callback function for when the WebRTC peer connection state changes
        peer_connection.on_peer_connection_state_change(Box::new(move |pcs| {
            match pcs {
                RTCPeerConnectionState::Connecting => {
                    log::info!("Connecting to Twilio Peer")
                }
                RTCPeerConnectionState::Connected => {
                    log::info!("Connected to Twilio Peer");
                    notify_tx.notify_waiters()
                }
                RTCPeerConnectionState::Disconnected => {
                    log::warn!("Disconnected from Twilio Peer")
                }
                RTCPeerConnectionState::Failed => {
                    log::error!("Twilio Peer connection failed")
                }
                RTCPeerConnectionState::Closed => {
                    log::warn!("Twilio Peer connection closed")
                }
                _ => {}
            };
            // needed because it wraps an empty async block into a boxed, pinned future that fulfills the type requirements of the event handler
            Box::pin(async {}) // no-op
        }));

        Ok(peer_connection)
    }

    pub async fn start_video_stream_forwarding(tracks: RemoteTracks) -> Result<(), VideoError> {
        let (offer_request_tx, offer_request_rx) = tokio::sync::mpsc::channel(10);

        // connect to cloud gRPC api
        let addr = "http://0.0.0.0:10002";
        log::info!("Client connecting to server on address {}", addr);
        let mut client = RobotInterfaceClient::connect(addr).await?;
        let token = gen_new_token().await?;
        let mut signaling_event_stream = robot_video_client(&mut client, &token, offer_request_rx).await?;

        // start by sending a single heartbeat
        let mut heartbeat = 0;
        let heartbeat_request = create_heartbeat_request(heartbeat);
        offer_request_tx.send(heartbeat_request).await.map_err(|e| {
            VideoError::Io(std::io::Error::new(
                std::io::ErrorKind::BrokenPipe,
                format!("Unable to to send heartbeat to grpc signaling server: {e:?}"),
            ))
        })?;

        // wait for twilio ice servers response
        let video_response: Option<VideoResponse> = signaling_event_stream.next().await;
        let video_response = video_response.unwrap();
        let ice_servers =
            if let robotics::video_response::Kind::IceServers(ice_servers) = video_response.kind.as_ref().unwrap() {
                ice_servers
            } else {
                return Err(VideoError::Signaling(tonic::Status::internal(
                    "Failed to get Twilio ice servers",
                )));
            };

        // extract remote tracks
        let mut remote_tracks: Vec<&Arc<webrtc::track::track_remote::TrackRemote>> = vec![];
        let tracks_read_lock = tracks.read().await;
        let mut twilio_tracks = Vec::with_capacity(tracks_read_lock.len());
        let mut track_pairs = vec![];
        for remote_track in tracks_read_lock.iter() {
            remote_tracks.push(remote_track);
        }

        // create peer connection and notifiers
        let notify_tx = Arc::new(Notify::new());
        let notify_video = notify_tx.clone();
        let twilio_peer_connection = Self::create_peer_connection(remote_tracks.clone(), ice_servers, notify_tx)
            .await
            .unwrap();

        // async handles
        let handles = Rc::new(RefCell::new(vec![]));
        let handles_clone = Rc::clone(&handles);

        // add local tracks to twilio peer connection
        for remote_track in remote_tracks.iter() {
            let rt = remote_track.to_owned();
            let codec_capability = &rt.params().await.codecs[0].capability;

            // create local track
            println!(
                "\n \n remote track id: {} and stream_id: {} \n \n",
                rt.id().await,
                rt.stream_id().await
            );
            let lt = Arc::new(TrackLocalStaticRTP::new(
                codec_capability.to_owned(),
                rt.id().await,
                rt.stream_id().await,
            ));

            // add local tracks to twilio message
            twilio_tracks.push(LocalMediaTrack {
                enabled: true,
                id: lt.id().to_string(),
                kind: lt.kind().to_string(),
                name: lt.stream_id().to_string(),
                priority: "standard".to_string(),
            });

            // add local track to twilio peer connection
            twilio_peer_connection.add_track(lt.clone()).await?;

            // spawn threads for relaying the RTP packets between source and sink tracks.
            track_pairs.push(TrackPair {
                source_track: rt.clone(),
                sink_track: lt.clone(),
            });
        }

        // Create offer and set our local description, this will also start ice candidate gathering
        let offer = twilio_peer_connection.create_offer(None).await.unwrap();
        twilio_peer_connection.set_local_description(offer).await?;

        // Wait for ice candidates to be gathered
        let mut gather_complete = twilio_peer_connection.gathering_complete_promise().await;
        let _ = gather_complete.recv().await;

        // send robot sdp and local tracks to Twilio
        let video_tracks_request: VideoRequest =
            create_video_tracks_request(twilio_tracks, twilio_peer_connection.clone()).await;

        offer_request_tx.send(video_tracks_request).await.map_err(|e| {
            VideoError::Io(std::io::Error::new(
                std::io::ErrorKind::BrokenPipe,
                format!("Unable to to send video tracks to grpc signaling server: {e:?}"),
            ))
        })?;

        // wait for twilio sdps response
        let video_response: Option<VideoResponse> = signaling_event_stream.next().await;
        let video_response = video_response.unwrap();
        let twilio_sdp = if let robotics::video_response::Kind::Sdp(sdp) = video_response.kind.as_ref().unwrap() {
            sdp
        } else {
            return Err(VideoError::Signaling(tonic::Status::internal(
                "Failed to get Twilio SDP",
            )));
        };

        // set the Twilio peer SDP
        twilio_peer_connection
            .set_remote_description(RTCSessionDescription::answer(twilio_sdp.to_owned()).unwrap())
            .await?;

        // continue sending heartbeats to keep video session alive
        let mut interval = IntervalStream::new(tokio::time::interval(tokio::time::Duration::from_secs(1)));
        tokio::spawn(async move {
            while interval.next().await.is_some() {
                heartbeat += 1;
                let heartbeat_request = create_heartbeat_request(heartbeat);
                offer_request_tx.send(heartbeat_request).await.unwrap();
            }
        });

        // Wait for connection established
        notify_video.notified().await;

        // Once the signaling process is complete and the WebRTC connection has been established,
        // media should flow between the peers automatically.
        let done_tx = Arc::new(Notify::new());
        let notify_stop_streaming = done_tx.clone();

        for pair in track_pairs {
            let handle = tokio::spawn(relay_track(pair.source_track, pair.sink_track));
            let mut handles_borrow_mut = handles_clone.borrow_mut();
            handles_borrow_mut.push(handle);
        }

        // TODO!! Implement method for stopping stream through grpc signaling bi-directional stream
        tokio::select! {
            _ = notify_stop_streaming.notified() => {
                println!("received done signal!");
            }
            _ = tokio::signal::ctrl_c() => {
                println!("Video streaming stopped with CTRL + C");
            }
        };

        Ok(())
    }
}

relay.rs

use std::sync::Arc;
use tokio::sync::mpsc;
use webrtc::track::{
    track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocalWriter},
    track_remote::TrackRemote,
};

pub(crate) async fn relay_track(
    source_track: Arc<TrackRemote>,
    sink_track: Arc<TrackLocalStaticRTP>,
) -> Result<(), std::io::Error> {
    // Create a channel for relaying RTP packets.
    let (tx, mut rx) = mpsc::unbounded_channel::<webrtc::rtp::packet::Packet>();

    // Read RTP packets from the source track.
    let read_future = {
        let tx = tx.clone();
        tokio::spawn(async move {
            while let Ok((rtp, _)) = source_track.read_rtp().await {
                if (tx.send(rtp)).is_err() {
                    break;
                }
            }
        })
    };

    // Write RTP packets to the sink track.
    let write_future = tokio::spawn(async move {
        while let Some(packet) = rx.recv().await {
            if (sink_track.write_rtp(&packet).await).is_err() {
                eprintln!("\n \n \n ERROR FORWARDING RTP PACKAGES \n \n \n");
                break;
            }
        }
    });

    // Wait for both read and write futures to complete.
    let (read_result, write_result) = tokio::join!(read_future, write_future);

    read_result?;
    write_result?;

    Ok(())
}

spot/client.rs

use std::sync::Arc;
use tokio::sync::{Mutex, Notify, RwLock};
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::MediaEngine;
use webrtc::api::APIBuilder;
use webrtc::ice_transport::ice_candidate_type::RTCIceCandidateType;
use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::{
    ice_transport::ice_candidate::RTCIceCandidate,
    peer_connection::{
        configuration::RTCConfiguration, sdp::session_description::RTCSessionDescription, RTCPeerConnection,
    },
};

use crate::spot::utils::{decode_base64_to_sdp, encode_sdp_to_base64};
use crate::{error::VideoError, wait_for_notifier};

use super::types::{AuthResponse, RemoteTracks, SpotCamOffer, SpotClient, SpotConfig};
// use super::udp::create_local_udp_connections;
use super::utils::default_request_headers;

// Todo: proper error handling, no unwrap
impl SpotClient {
    pub async fn new(spot_config: SpotConfig) -> SpotClient {
        let default_headers = default_request_headers();
        let client = reqwest::Client::builder()
            .default_headers(default_headers)
            .danger_accept_invalid_certs(true) // Add this line to disable SSL verification
            .build()
            .unwrap();

        SpotClient {
            spot_config,
            token: None,
            client,
        }
    }

    /// request jwt access token from spot using the provided credentials
    async fn get_access_token(&mut self) -> Result<AuthResponse, reqwest::Error> {
        let response = self
            .client
            .post(format!(
                "https://{}:{}/auth",
                self.spot_config.host, self.spot_config.port
            ))
            .body(serde_json::to_string(&self.spot_config.credentials).unwrap())
            .send()
            .await?
            .error_for_status()?;

        let auth_response: AuthResponse = response.json().await?;
        self.token = Some(auth_response.to_owned().token);

        Ok(auth_response)
    }

    /// request a WebRTC offer from spot
    async fn get_spot_cam_offer(&mut self) -> Result<SpotCamOffer, reqwest::Error> {
        let token = self.get_access_token().await?.token;

        let url = format!("https://{}:{}/h264.sdp", self.spot_config.host, self.spot_config.port);

        let sdp_resp = self
            .client
            .get(url)
            .bearer_auth(token)
            .send()
            .await?
            .error_for_status()?;

        let mut offer: SpotCamOffer = sdp_resp.json().await?;
        offer.sdp = decode_base64_to_sdp(&offer.sdp).unwrap();
        println!("spot cam sdp: {}", offer.sdp);
        Ok(offer)
    }

    /// answer the spot cam offer with the offer of from Twilio
    async fn send_spot_cam_answer(&self, answer: SpotCamOffer) -> Result<(), reqwest::Error> {
        let token = self.token.as_ref().unwrap();
        let response = self
            .client
            .post(format!(
                "https://{}:{}/h264.sdp",
                self.spot_config.host, self.spot_config.port
            ))
            .bearer_auth(token)
            .body(serde_json::to_string(&answer).unwrap())
            .send()
            .await?
            .error_for_status()?;

        println!(
            "answer response {}: {}",
            response.status(),
            response.text().await.unwrap()
        );

        Ok(())
    }

    async fn send_spot_cam_ice_candidates(&self, answer: &SpotCamOffer) -> Result<(), reqwest::Error> {
        let token = self.token.as_ref().unwrap();
        let candidate_resp = self
            .client
            .post(format!(
                "https://{}:{}/h264.sdp",
                self.spot_config.host, self.spot_config.port
            ))
            .bearer_auth(token)
            .body(serde_json::to_string(&answer).unwrap())
            .send()
            .await?
            .error_for_status()?;

        println!(
            "Candidate response {}: {}",
            candidate_resp.status(),
            candidate_resp.text().await.unwrap()
        );

        Ok(())
    }

    /// create a peer connection instance
    async fn create_peer_connection(
        &self,
        spot_cam_offer: SpotCamOffer,
        notify_gathering: Arc<Notify>,
        ice_candidate_functions: Arc<Mutex<Vec<SpotCamOffer>>>,
        notify_tracks: Arc<Notify>,
        twilio_remote_tracks: RemoteTracks,
    ) -> Result<Arc<RTCPeerConnection>, webrtc::Error> {
        // Prepare the configuration
        let config = RTCConfiguration {
            ice_servers: vec![RTCIceServer {
                urls: vec![],
                ..Default::default()
            }],
            ..Default::default()
        };

        // Create a MediaEngine object to configure the supported codec
        let mut m = MediaEngine::default();
        m.register_default_codecs()?;

        let mut registry = Registry::new();

        // Use the default set of Interceptors
        registry = register_default_interceptors(registry, &mut m)?;

        // Create the API object with the MediaEngine
        let api = APIBuilder::new()
            .with_media_engine(m)
            .with_interceptor_registry(registry)
            .build();

        // Create a new RTCPeerConnection
        let spot_peer_connection = Arc::new(api.new_peer_connection(config).await?);

        // set the Session Description of the remote peer, being spot cam
        spot_peer_connection
            .set_remote_description(RTCSessionDescription::offer(spot_cam_offer.sdp).unwrap())
            .await?;

        // Handle incoming tracks on the spot peer connection
        let mut n_tracks = 0;
        // let udp_conns = create_local_udp_connections().await.unwrap();

        // Set a handler for when a new remote track starts, this handler will forward data to
        // our UDP listeners and will also collect the remote spot tracks to be forwarded
        spot_peer_connection.on_track(Box::new(move |track, _| {
            match track {
                None => {
                    // Apparently, `on_track` does not return a nil pointer when all tracks are received
                    // like `on_ice_candidate` does, so it's likely that `None` will never actually be invoked
                    // notify_tracks.notify_one();
                    Box::pin(async {}) // no-op
                }
                Some(spot_remote_track) => {
                    // let spot_remote_track_clone = Arc::clone(&spot_remote_track);
                    n_tracks += 1;
                    log::info!(
                        "{}",
                        format!(
                            "Track has started, of type: ({}) ({})",
                            spot_remote_track.kind(),
                            spot_remote_track.payload_type()
                        )
                    );

                    // Clone the Arc to avoid moving the original captured variable
                    let twilio_remote_tracks_inner = twilio_remote_tracks.clone();

                    // If atleast two remote tracks (video and audio) have been received, we notify the main thread
                    if n_tracks >= 2 {
                        // briefly sleep so that track can be appended before we notify to avoid race condition
                        let _sleep = tokio::time::sleep(tokio::time::Duration::from_millis(200));
                        tokio::pin!(_sleep);
                        notify_tracks.notify_one();
                    }

                    Box::pin(async move {
                        // Lock the Mutex and push the local_track to the vector
                        twilio_remote_tracks_inner.write().await.push(spot_remote_track);
                    })
                }
            }
        }));

        // The callback `on_ice_candidate` is invoked when the local WebRTC peer generates an ICE candidate or when ICE gathering is complete.
        spot_peer_connection.on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
            let ice_candidate_functions_clone = ice_candidate_functions.clone();

            match candidate {
                None => {
                    println!("ICE gathering complete!");
                    notify_gathering.notify_one()
                }
                Some(ice_candidate) => {
                    let cand_typ = match ice_candidate.typ {
                        RTCIceCandidateType::Unspecified => "unspecified",
                        RTCIceCandidateType::Host => "local",
                        RTCIceCandidateType::Srflx | RTCIceCandidateType::Prflx => "server reflexive (STUN)",
                        RTCIceCandidateType::Relay => "relay (TURN)",
                    };

                    println!("found a candidate of type: {cand_typ}");

                    // https://github.com/webrtc-rs/webrtc/blob/5859550cac046376bd4c7a252e79199c54f44bb8/examples/examples/rtp-forwarder/rtp-forwarder.rs#L303
                    // Convert the RTCIceCandidate to RTCIceCandidateInit
                    let ice_candidate_init = ice_candidate.to_json().unwrap();

                    // Base64 encode the candidate string
                    println!("ice candididate init: {ice_candidate_init:?}");
                    let candidate_body = SpotCamOffer {
                        id: spot_cam_offer.id,
                        command: "icecandidate".to_string(),
                        sdp: encode_sdp_to_base64(&ice_candidate_init.candidate),
                    };
                    tokio::spawn(async move { ice_candidate_functions_clone.lock().await.push(candidate_body) });
                }
            }
            Box::pin(async {})
        }));

        // sets up an empty callback function for when the ICE connection state changes.
        spot_peer_connection.on_ice_connection_state_change(Box::new(move |f| {
            println!("Spot Ice connection state changed {f}");
            // // needed because it wraps an empty async block into a boxed, pinned future that fulfills the type requirements of the event handler
            Box::pin(async {}) // no-op
        }));

        // sets up a callback function for when the WebRTC peer connection state changes
        spot_peer_connection.on_peer_connection_state_change(Box::new(move |f| {
            println!("Spot Peer connection state changed {f}");
            if f == RTCPeerConnectionState::Connected {
                println!("connected!");
                // Notify video streamer that the connection is up
            }
            // needed because it wraps an empty async block into a boxed, pinned future that fulfills the type requirements of the event handler
            Box::pin(async {}) // no-op
        }));

        Ok(spot_peer_connection)
    }

    /// establish a peer connection between webrtc controller and spot cam, and return the local video track
    pub async fn connect(&mut self) -> Result<RemoteTracks, VideoError> {
        let spot_cam_offer = self.get_spot_cam_offer().await.unwrap();

        // create ice candidate notifier
        let notify_gathering_tx = Arc::new(Notify::new());
        let all_ice_candidates_gathered = notify_gathering_tx.clone();

        // create spot remote track notifier
        let notify_tracks_tx = Arc::new(Notify::new());
        let all_tracks_received = notify_tracks_tx.clone();
        let twilio_remote_tracks = Arc::new(RwLock::new(vec![]));
        let twilio_remote_tracks_clone = twilio_remote_tracks.clone();

        // create a peer connection
        let ice_candidates = Arc::new(Mutex::new(vec![]));
        let pc = self
            .create_peer_connection(
                spot_cam_offer.clone(),
                notify_gathering_tx,
                ice_candidates.clone(),
                notify_tracks_tx,
                twilio_remote_tracks,
            )
            .await
            .map_err(|e| {
                VideoError::Io(std::io::Error::new(
                    std::io::ErrorKind::ConnectionRefused,
                    format!(
                        "Unable to establish peer connection between webrtc controller and Spot due to error: {e:?}"
                    ),
                ))
            })?;

        // generate a Session Description for the local peer, being the webrtc controller
        let answer = pc.create_answer(None).await.unwrap();
        pc.set_local_description(answer)
            .await
            .expect("Failed to set local description");

        // Wait until all candidates have been gathered
        wait_for_notifier!(all_ice_candidates_gathered);

        // answer back to spot with the Session Description of the webrtc controller
        let answer_sdp = pc.local_description().await.unwrap().sdp;

        let spot_cam_answer = SpotCamOffer {
            id: spot_cam_offer.id,
            command: "answer".to_string(),
            sdp: encode_sdp_to_base64(&answer_sdp),
        };

        // send answer sdp
        println!("\n \n answer sdp: \n {answer_sdp}");
        self.send_spot_cam_answer(spot_cam_answer)
            .await
            .expect("failed to send spot answer");

        println!("finished answer!");
        // send ice candidates
        for ic in ice_candidates.lock().await.iter() {
            if let Err(e) = Self::send_spot_cam_ice_candidates(self, ic).await {
                println!("Failed to send ICE candidate: {}", e);
            }
        }

        // wait for all remote spot tracks to arrive
        println!("wait for all tracks");
        wait_for_notifier!(all_tracks_received);

        Ok(twilio_remote_tracks_clone)
    }
}
Sollimann commented 1 year ago

Also, a few observations that I have made visiting chrome://webrtc-internals/ while streaming:

  1. The ICE candidate pair (CPW/3CP+dN_T0B6L3Gv) is in a "succeeded" state, which means the connection has been established correctly.
  2. There is no issue with the audio stream, as indicated by the "audioLevel" and "totalAudioEnergy" statistics, which show non-zero values.
  3. The video stream seems to be problematic. The "framesReceived", "framesDecoded", and "framesDropped" statistics are all zero, indicating that no video frames are being received or decoded. Also, the "jitterBufferEmittedCount" is zero for the video track (DEPRECATED_TI23), which means no video frames are being emitted from the jitter buffer.
Sollimann commented 1 year ago

@rainliu @regexident any suggestions? :)

Sollimann commented 1 year ago

(SOLVED) It turns out the code was correct, however, our corporate network firewall was blocking webrtc traffic.