webrtc-rs / webrtc

A pure Rust implementation of WebRTC
https://webrtc.rs
Apache License 2.0
4.1k stars 364 forks source link

on_track is not called when using webrtc-rs as the producer and consumer #408

Open jamal opened 1 year ago

jamal commented 1 year ago

I am trying to write an application that uses webrtc-rs as both the producer and consumer, but I am having trouble with on_track being called after the connection is established. I was able to get both halves working with a browser initiating the offer and track (using the jsfiddle from the examples). I'm really not sure if this is an issue from my usage of the library specifically, or if this is a bug.

From digging into the internals on the consumer side, I can see that a session is initialized but when it reaches PeerConnectionInternal::start_receiver, there is a call to track.peek to determine the codec which then never seems to return. I'm still digging into the initialization on the producer side to understand if there is something I'm missing, or if there is something that is missing in webrtc-rs. But, for some reason this works fine when a browser is initializing the track.

I made a hacky attempt to recreate this flow in an independent application which I will paste below. This shows the same issue I'm seeing in my application:

use std::env;
use std::io::Write;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use base64::engine::general_purpose;
use base64::Engine;
use bytes::BytesMut;
use log::debug;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::sync::Notify;
use webrtc::api::media_engine::MIME_TYPE_OPUS;
use webrtc::api::{
    interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder,
};
use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState;
use webrtc::media::Sample;
use webrtc::rtp_transceiver::rtp_codec::{
    RTCRtpCodecCapability, RTCRtpCodecParameters, RTPCodecType,
};
use webrtc::track::track_local::track_local_static_sample::TrackLocalStaticSample;
use webrtc::{
    interceptor::registry::Registry,
    peer_connection::{configuration::RTCConfiguration, RTCPeerConnection},
};

async fn create_peer_connection(add_track: bool) -> Result<Arc<RTCPeerConnection>> {
    let mut m = MediaEngine::default();
    // m.register_default_codecs()?;

    m.register_codec(
        RTCRtpCodecParameters {
            capability: RTCRtpCodecCapability {
                mime_type: MIME_TYPE_OPUS.to_owned(),
                clock_rate: 48000,
                channels: 2,
                sdp_fmtp_line: "".to_owned(),
                rtcp_feedback: vec![],
            },
            payload_type: 111,
            ..Default::default()
        },
        RTPCodecType::Audio,
    )?;

    let mut registry = Registry::new();
    registry = register_default_interceptors(registry, &mut m)?;

    let api = APIBuilder::new()
        .with_media_engine(m)
        .with_interceptor_registry(registry)
        .build();

    let config = RTCConfiguration {
        ..Default::default()
    };

    let pc = Arc::new(api.new_peer_connection(config).await?);

    pc.add_transceiver_from_kind(RTPCodecType::Audio, &[])
        .await?;

    let notify_tx = Arc::new(Notify::new());
    let notify_track = notify_tx.clone();

    pc.on_ice_candidate(Box::new(move |candidate| {
        debug!("ICE Candidate: {:?}", candidate);
        Box::pin(async {})
    }));

    pc.on_ice_connection_state_change(Box::new(move |state| {
        debug!("Connection State changed: {}", state);
        if state == RTCIceConnectionState::Connected {
            println!("notifying track");
            notify_tx.notify_waiters();
        }
        Box::pin(async {})
    }));

    pc.on_ice_gathering_state_change(Box::new(move |state| {
        debug!("ICE Gathering state changed: {:?}", state);
        Box::pin(async {})
    }));

    pc.on_negotiation_needed(Box::new(move || {
        debug!("Negotiation needed");
        Box::pin(async {})
    }));

    pc.on_peer_connection_state_change(Box::new(move |state| {
        debug!("Peer Connection state changed: {:?}", state);
        Box::pin(async {})
    }));

    pc.on_signaling_state_change(Box::new(move |state| {
        debug!("Signaling state changed: {:?}", state);
        Box::pin(async {})
    }));

    let audio_track = Arc::new(TrackLocalStaticSample::new(
        RTCRtpCodecCapability {
            mime_type: MIME_TYPE_OPUS.to_owned(),
            ..Default::default()
        },
        "audio".to_owned(),
        "webrtc-rs".to_owned(),
    ));

    if add_track {
        println!("adding track");

        let rtp_sender = pc.add_track(audio_track.clone()).await?;

        tokio::spawn(async move {
            let mut buf = vec![0u8; 1500];
            while let Ok((_, _)) = rtp_sender.read(&mut buf).await {}
        });

        let frame_size = 2880;
        let channels = 2;
        let max_packet = 1500;
        let mut buf: Vec<f32> = Vec::with_capacity(frame_size * channels);
        buf.resize(frame_size * channels, 0f32);
        let mut out_buf: Vec<u8> = Vec::with_capacity(max_packet);
        out_buf.resize(max_packet, 0u8);

        let sample_rate = 48000f32;
        let mut sample_clock = 0f32;
        let mut next_sample = move || {
            sample_clock = (sample_clock + 1.0) % sample_rate;
            (sample_clock * 440.0 * 2.0 * std::f32::consts::PI / sample_rate).sin()
        };

        let mut enc = opus::Encoder::new(
            sample_rate as u32,
            opus::Channels::Stereo,
            opus::Application::Audio,
        )
        .unwrap();

        tokio::spawn(async move {
            notify_track.notified().await;

            println!("starting to write to track");

            loop {
                // Fill buffer with beep
                for frame in buf.chunks_mut(2) {
                    let value = next_sample();
                    for sample in frame.iter_mut() {
                        *sample = value;
                    }
                }

                let mut out_buf = BytesMut::with_capacity(max_packet);
                out_buf.resize(max_packet, 0u8);
                match enc.encode_float(buf.as_slice(), out_buf.as_mut()) {
                    Ok(_) => {
                        audio_track
                            .write_sample(&Sample {
                                data: out_buf.freeze(),
                                duration: Duration::from_millis(60),
                                ..Default::default()
                            })
                            .await
                            .unwrap();
                    }
                    Err(e) => panic!("error encoding: {:?}", e),
                }
            }
        });
    } else {
        pc.on_track(Box::new(move |track, _| {
            println!("on track");
            Box::pin(async {
                if let Some(track) = track {
                    let codec = track.codec().await;
                    let mime_type = codec.capability.mime_type.to_lowercase();
                    if mime_type == MIME_TYPE_OPUS.to_lowercase() {
                        let channels = match codec.capability.channels {
                            1 => opus::Channels::Mono,
                            _ => opus::Channels::Stereo,
                        };

                        let mut decoder =
                            opus::Decoder::new(codec.capability.clock_rate, channels).unwrap();
                        tokio::spawn(async move {
                            let mut buf = vec![0f32; 192000];
                            loop {
                                let (rtp, _) = track.read_rtp().await.unwrap();
                                println!("read {} bytes from rtc", rtp.payload.len());
                                match decoder.decode_float(
                                    rtp.payload.as_ref(),
                                    buf.as_mut_slice(),
                                    false,
                                ) {
                                    Ok(n) => {
                                        println!("decoded {} bytes", n);
                                    }
                                    Err(e) => eprintln!("decode error: {:?}", e),
                                }
                            }
                        });
                    } else {
                        println!("unsupported track mime-type: {}", mime_type);
                    }
                }
            })
        }));
    }

    Ok(pc)
}

async fn offer() -> Result<()> {
    println!("Creating offer...");

    let pc = create_peer_connection(true).await?;
    // let _ = pc.create_data_channel("data", None).await?;

    let offer = pc.create_offer(None).await?;

    let mut gather_complete = pc.gathering_complete_promise().await;

    pc.set_local_description(offer).await?;

    let _ = gather_complete.recv().await;

    let offer = pc.local_description().await;
    println!("{}", encode(&serde_json::to_string(&offer)?)?);

    print!("Enter answer: ");
    std::io::stdout().flush().unwrap();

    if let Some(answer) = readline().await? {
        pc.set_remote_description(serde_json::from_str(&decode(&answer)?)?)
            .await?;

        println!("Press ctrl-c to stop");
        tokio::select! {
            _ = tokio::signal::ctrl_c() => {
                println!();
            }
        };

        pc.close().await?;
    }

    Ok(())
}

async fn answer() -> Result<()> {
    print!("Enter offer: ");
    std::io::stdout().flush().unwrap();

    if let Some(offer) = readline().await? {
        let pc = create_peer_connection(false).await?;
        pc.set_remote_description(serde_json::from_str(&decode(&offer)?)?)
            .await?;

        println!("Creating answer...");
        let answer = pc.create_answer(None).await?;

        let mut gather_complete = pc.gathering_complete_promise().await;

        pc.set_local_description(answer).await?;

        let _ = gather_complete.recv().await;

        let answer = pc.local_description().await;
        println!("{}", encode(&serde_json::to_string(&answer)?)?);

        println!("Press ctrl-c to stop");
        tokio::select! {
            _ = tokio::signal::ctrl_c() => {
                println!();
            }
        };

        pc.close().await?;
    }

    Ok(())
}

#[tokio::main]
async fn main() {
    env_logger::init();

    let args: Vec<String> = env::args().collect();
    if args.len() > 1 {
        match args[1].as_str() {
            "offer" => match offer().await {
                Ok(_) => std::process::exit(0),
                Err(e) => panic!("{:?}", e),
            },

            "answer" => match answer().await {
                Ok(_) => std::process::exit(0),
                Err(e) => panic!("{:?}", e),
            },

            _ => {}
        }
    }

    println!("bad arguments");
    std::process::exit(0x1);
}

async fn readline() -> std::io::Result<Option<String>> {
    let stdin = tokio::io::stdin();
    let reader = BufReader::new(stdin);
    reader.lines().next_line().await
}

fn encode(b: &str) -> Result<String> {
    Ok(general_purpose::URL_SAFE.encode(b))
}

fn decode(b: &str) -> Result<String> {
    Ok(String::from_utf8(general_purpose::URL_SAFE.decode(b)?)?)
}
alcacer0 commented 1 year ago

Hi, in case it helps, in my case I was getting a negotiationneeded event when writing to a video track. Doing the signaling again worked.