livekit / rust-sdks

LiveKit realtime and server SDKs for Rust
https://livekit.io
Apache License 2.0
212 stars 57 forks source link

capture_frame never completes #408

Closed jkears closed 3 months ago

jkears commented 3 months ago

I have repurposed the Unity SDK for LiveKit into an new .Net Client SDK. This invokes requests via the LiveKit-Ffi and it works amazingly well ... except for one major issue.

We have our own internal framework which include our own AI Assistant solution. When a chat completion is transformed to audio and is applied to an audio source as a capture frame it works for a while but more so than not, it seems that the libwebrtc native audio_source function blocks forever and I am not sure why.

The reason I know this is occurring in or within this function is that a calling method does not raise the CaptureAudioFrame event is never raised ...

let res = source.capture_frame(&audio_frame).await;
let _ = server.send_event(proto::ffi_event::Message::CaptureAudioFrame(
                        proto::CaptureAudioFrameCallback {
                            async_id,
                            error: res.err().map(|e| e.to_string()),
                        },
                    ));

In this method ...

 pub async fn capture_frame(&self, frame: &AudioFrame<'_>) -> Result<(), RtcError> {
        if self.sample_rate != frame.sample_rate || self.num_channels != frame.num_channels {
            return Err(RtcError {
                error_type: RtcErrorType::InvalidState,
                message: "sample_rate and num_channels don't match".to_owned(),
            });
        }
        let mut inner = self.inner.lock().await;
        let mut samples = 0;
        // split frames into 10ms chunks
        loop {
            let remaining_samples = frame.data.len() - samples;
            if remaining_samples == 0 {
                break;
            }
            if (inner.len != 0 && remaining_samples > 0) || remaining_samples < self.samples_10ms {
                let missing_len = self.samples_10ms - inner.len;
                let to_add = missing_len.min(remaining_samples);
                let start = inner.len;
                inner.buf[start..start + to_add]
                    .copy_from_slice(&frame.data[samples..samples + to_add]);
                inner.len += to_add;
                samples += to_add;
                if inner.len == self.samples_10ms {
                    let data = inner.buf.clone().to_vec();
                    let _ = self.po_tx.send(data).await;
                    inner.len = 0;
                }
                continue;
            }
            if remaining_samples >= self.samples_10ms {
                // TODO(theomonnom): avoid copying
                let data = frame.data[samples..samples + self.samples_10ms].to_vec();
                let _ = self.po_tx.send(data).await;
                samples += self.samples_10ms;
            }
        }
        Ok(())
    }
}

... it calls into let _ = self.po_tx.send(data).await; and never comes back. I know that because previously I had println statements before and after.

This tends to work for a period of time and then it stops.

In my tests I applied the same audio each time and it stops processing at random places in the track.
Plus, up to the point where it blocks I can hear the audio playing back on a remote participant (JS Client) with no errors at the client.

Because the source is always the same and that I initially hear what I expect to hear, that the source is properly framed and if it wasn't that I'd expect not hear anything or that it would always fail at the exact same frame (which it does not).

What can cause this behavior?
Are there other things I can run to figure this out?

I am otherwise extremely pleased with how all else is working, so any assistance would be greatly appreciated!

jkears commented 3 months ago

I understand this has a .Net play but I really do not think this the main issue.

In my previous attempts I was orchestrating each capture frame from .Net and subscribe to the event that is published once it completed and while that often starts out sounding good, it often hangs, so then I thought I'd pass in an entire sentence of audio and process it all within the boundaries of Rust (code below), and it runs the same when I orchestrated over individual capture frames.

The source of the data is derived from a TTS from OpenAI (24kHz), and I've tried reading and processing wav files with identical test results.

Please review this simple code (which plays very choppy and often hangs) .....

fn on_capture_audio_frame_from_net(
    server: &'static FfiServer,
    capture: proto::CaptureAudioFrameFromNetRequest,
) -> FfiResult<proto::CaptureAudioFrameFromNetResponse> {
    println!("Received CaptureAudioFrameFromNetRequest");

    let source = server.retrieve_handle::<audio_source::FfiAudioSource>(capture.source_handle)?;
    let pcm_data = capture.pcm_data;
    let sample_rate: u32 = capture.sample_rate.try_into().expect("Invalid sample rate");
    let num_channels: u32 = capture.num_channels.try_into().expect("Invalid number of channels");

    let source_clone = source.source.clone();
    let async_id = server.next_id();
    println!("Generated async_id: {}", async_id);

    // Adjust the chunk size to 10ms (based on sample_rate and num_channels)
    let ms10_samples = (sample_rate / 100) as usize * num_channels as usize; // 10ms worth of samples per channel

    let handle = server.async_runtime.spawn(async move {
        let mut offset = 0;
        while offset < pcm_data.len() {
            let chunk_end = std::cmp::min(offset + ms10_samples * 2, pcm_data.len());
            let frame_chunk = &pcm_data[offset..chunk_end];

            let frame_data: Vec<i16> = frame_chunk
                .chunks_exact(2)
                .map(|chunk| i16::from_le_bytes([chunk[0], chunk[1]]))
                .collect();

            let samples_per_channel = (frame_data.len() / num_channels as usize) as u32;

            let audio_frame = AudioFrame {
                data: Cow::Owned(frame_data),
                sample_rate,
                num_channels,
                samples_per_channel,
            };

            match source_clone {
                #[cfg(not(target_arch = "wasm32"))]
                RtcAudioSource::Native(ref source_native) => {
                    let res = source_native.capture_frame(&audio_frame).await;
                    if let Err(err) = &res {
                        println!("Error capturing frame: {:?}", err);
                    }
                }
                _ => {}
            }

            offset = chunk_end;
        }

        let _ = server.send_event(proto::ffi_event::Message::CaptureAudioFrameFromNet(
            proto::CaptureAudioFrameFromNetCallback {
                async_id: async_id.try_into().unwrap_or_default(),
                error: None.into(),
            },
        ));

        println!("CaptureAudioFrameFromNetCallback sent with async_id: {}", async_id);
    });

    server.watch_panic(handle);

    Ok(proto::CaptureAudioFrameFromNetResponse {
        async_id: async_id.try_into().unwrap_or_default(),
        error: String::new(),
    })
}

It is hanging in this logic ...

  pub async fn capture_frame(&self, frame: &AudioFrame<'_>) -> Result<(), RtcError> {
        if self.sample_rate != frame.sample_rate || self.num_channels != frame.num_channels {
            return Err(RtcError {
                error_type: RtcErrorType::InvalidState,
                message: "sample_rate and num_channels don't match".to_owned(),
            });
        }
        let mut inner = self.inner.lock().await;
        let mut samples = 0;
        // split frames into 10ms chunks
        loop {
            let remaining_samples = frame.data.len() - samples;
            if remaining_samples == 0 {
                break;
            }
            if (inner.len != 0 && remaining_samples > 0) || remaining_samples < self.samples_10ms {
                let missing_len = self.samples_10ms - inner.len;
                let to_add = missing_len.min(remaining_samples);
                let start = inner.len;
                inner.buf[start..start + to_add]
                    .copy_from_slice(&frame.data[samples..samples + to_add]);
                inner.len += to_add;
                samples += to_add;
                if inner.len == self.samples_10ms {
                    let data = inner.buf.clone().to_vec();
                    //println!("self.po_tx.send1");
                    let _ = self.po_tx.send(data).await;
                    //println!("self.po_tx.send1 back");
                    inner.len = 0;
                }
                continue;
            }
            if remaining_samples >= self.samples_10ms {
                // TODO(theomonnom): avoid copying
                let data = frame.data[samples..samples + self.samples_10ms].to_vec();
                //println!("self.po_tx.send2");
                let _ = self.po_tx.send(data).await;
                //println!("self.po_tx.send2  back");
                samples += self.samples_10ms;
            }
        }
        Ok(())
    }

Specifically when invoking this call let _ = self.po_tx.send(data).await; it goes in but never seems to return for an extraordinary long time.

jkears commented 3 months ago

I have this working now so will close this. The issue was a source audio data transformation to Float[] was not correct data.

This is the final design .... image

Derived from this domain model (which code generates code) ... image

We can model as many AI Assistant aggregates as needed. Each aggregate can be as simple or as complex composition as needed.

The Rust LiveKit SDK is awesome, glad to have our .NET SDK now working... like a long-running appliance!

Many thanks to the maintainers of this repo!!!

Cheers

davidzhao commented 2 months ago

Thanks for sharing your resolution @jkears! appreciate it!

jkears commented 2 months ago

@davidzhao actually we had to alter the send(s) in capture frame to trysend(s) as send blocks forever. When set to trysend when there is a downstream wrt issue it does not block. The issue we saw coming back in trysend and likely the reason for the blocking was the buffer was full. This buffer never becomes un-full and thus we block forever.... instead with trysend we get notified of the failed or full-buffer but are able to send the very next frame without any notificable sound loss, but is there a proper way to handle when the buffer is full?

pub async fn capture_frame(&self, frame: &AudioFrame<'_>) -> Result<(), RtcError> {
        if self.sample_rate != frame.sample_rate || self.num_channels != frame.num_channels {
            return Err(RtcError {
                error_type: RtcErrorType::InvalidState,
                message: "sample_rate and num_channels don't match".to_owned(),
            });
        }

        //print!("capture_frame locking");

        // Set a maximum lock hold of 1 second
        let lock_result = timeout(Duration::from_secs(1), self.inner.lock()).await;

        // Handle timeout or lock acquisition failure
        let mut inner = match lock_result {
            Ok(inner) => inner, // Lock acquired successfully
            Err(_) => {
                // Timeout occurred
                return Err(RtcError {
                    error_type: RtcErrorType::InvalidState,
                    message: "Failed to acquire lock within 1 second".to_owned(),
                });
            }
        };

        //println!("capture_frame locked");

        let mut samples = 0;

        // split frames into 10ms chunks
        loop {

            let remaining_samples = frame.data.len() - samples;
            if remaining_samples == 0 {
                break;
            }
            if (inner.len != 0 && remaining_samples > 0) || remaining_samples < self.samples_10ms {
                let missing_len = self.samples_10ms - inner.len;
                let to_add = missing_len.min(remaining_samples);
                let start = inner.len;
                inner.buf[start..start + to_add]
                    .copy_from_slice(&frame.data[samples..samples + to_add]);
                inner.len += to_add;
                samples += to_add;
                if inner.len == self.samples_10ms {
                    let data = inner.buf.clone().to_vec();
                    match self.po_tx.try_send(data) {
                        Ok(_) => {
                            // Successfully sent the data
                        }
                        Err(e) => {
                            // Handle the error, e.g., channel is full
                            println!("Failed to send data: {:?}", e);
                            return Err(RtcError {
                                error_type: RtcErrorType::Internal,
                                message: "Failed to send data".to_owned(),
                            });
                        }
                    }
                    inner.len = 0;
                }
                continue;
            }
            if remaining_samples >= self.samples_10ms {
                // TODO: avoid copying
                let data = frame.data[samples..samples + self.samples_10ms].to_vec();

                match self.po_tx.try_send(data) {
                    Ok(_) => {
                        // Successfully sent the data
                    }
                    Err(e) => {
                        // Handle the error, e.g., channel is full
                        println!("Failed to send data: {:?}", e);
                        return Err(RtcError {
                            error_type: RtcErrorType::Internal,
                            message: "Failed to send data".to_owned(),
                        });
                    }
                }
                samples += self.samples_10ms;
            }
        }

        //println!("capture_frame unlocking");

        Ok(())
    }

specific behavior changes self.po_tx.try_send(data)