livekit / rust-sdks

LiveKit realtime and server SDKs for Rust
https://livekit.io
Apache License 2.0
212 stars 57 forks source link

Capture Frame Consistently Hang Audio Output #420

Closed jkears closed 2 months ago

jkears commented 2 months ago

I am using a modified Unity SDK for .NET and I am able to connect, create rooms add participants, process remote participant audio as well as publish audio track/source played back on the client (Blazor using JS SDK). This works but is highly temperamental to load.

The following diagram depicts a testing approach that I am working in which a .NET console application is creating 100 rooms (amount can vary). Not illustrated is a a Blazor client app that connects to it's own room and interacts with AI.

image

On the backend, the service uses a LiveKit Web Hook to detect the new rooms being created by the Test Client and creates a separate Room Processor, each of which has it's own Ffi-LiveKit instance, each of which monitors remote participants and audio tracks and captures the audio via VAD to AI and back. This works well, and in a test of 100 rooms being created plus one that I use via the Blazor client, allows me to run the test whilst speaking via the Blazor app.

The issue I have had (and from the start of this journey), is that the capture_frame (RUST SDK) function is a blocking function but it is extremely temperamental to any load that is occurring within the host web service. If there is only a single Blazor client i.e. single room, this works well, however as I run the test client which creates 100 rooms, it doesn't take long before any ongoing audio stream that is pushed to a track in the same room as the Blazor client hangs and hangs for good. I can never resend through the broken channel.

This is improved (but with some frame loss) when instead of calling Send in RUST SDK I change to TrySend and at least I get notified of the frame loss errors, but it still will lose the channel even if the load ceases.

When this locks up, the only way I know to recover is to disconnect the the Blazor client and reconnect. Also, when this occurs, there are significant errors showing in the JS SDK.

This is the function that will hang forever when using Send but that I modified with TrySend.

pub async fn capture_frame(&self, frame: &AudioFrame<'_>) -> Result<(), RtcError> {
        if self.sample_rate != frame.sample_rate || self.num_channels != frame.num_channels {
            return Err(RtcError {
                error_type: RtcErrorType::InvalidState,
                message: "sample_rate and num_channels don't match".to_owned(),
            });
        }

        //print!("capture_frame locking");

        // Set a maximum lock hold of 1 second
        let lock_result = timeout(Duration::from_secs(1), self.inner.lock()).await;

        // Handle timeout or lock acquisition failure
        let mut inner = match lock_result {
            Ok(inner) => inner, // Lock acquired successfully
            Err(_) => {
                // Timeout occurred
                return Err(RtcError {
                    error_type: RtcErrorType::InvalidState,
                    message: "Failed to acquire lock within 1 second".to_owned(),
                });
            }
        };

        //println!("capture_frame locked");

        let mut samples = 0;

        // split frames into 10ms chunks
        loop {

            let remaining_samples = frame.data.len() - samples;
            if remaining_samples == 0 {
                break;
            }
            if (inner.len != 0 && remaining_samples > 0) || remaining_samples < self.samples_10ms {
                let missing_len = self.samples_10ms - inner.len;
                let to_add = missing_len.min(remaining_samples);
                let start = inner.len;
                inner.buf[start..start + to_add]
                    .copy_from_slice(&frame.data[samples..samples + to_add]);
                inner.len += to_add;
                samples += to_add;
                if inner.len == self.samples_10ms {
                    let data = inner.buf.clone().to_vec();
                    match self.po_tx.try_send(data) {
                        Ok(_) => {
                            // Successfully sent the data
                        }
                        Err(e) => {
                            // Handle the error, e.g., channel is full
                            println!("Failed to send data: {:?}", e);
                            return Err(RtcError {
                                error_type: RtcErrorType::Internal,
                                message: "Failed to send data".to_owned(),
                            });
                        }
                    }
                    inner.len = 0;
                }
                continue;
            }
            if remaining_samples >= self.samples_10ms {
                // TODO: avoid copying
                let data = frame.data[samples..samples + self.samples_10ms].to_vec();

                match self.po_tx.try_send(data) {
                    Ok(_) => {
                        // Successfully sent the data
                    }
                    Err(e) => {
                        // Handle the error, e.g., channel is full
                        println!("Failed to send data: {:?}", e);
                        return Err(RtcError {
                            error_type: RtcErrorType::Internal,
                            message: "Failed to send data".to_owned(),
                        });
                    }
                }
                samples += self.samples_10ms;
            }
        }

        //println!("capture_frame unlocking");

        Ok(())
    }

I know when this occurs as I use the CaptureAudioFrame event to indicate I have successfully processed a frame (or it was in error), ie let _ = server.send_event(proto::ffi_event::Message::CaptureAudioFrame and when this never fires for say 10 seconds I am in a position of no return and the channel is caput so-to-speak.... but is it really... there must be a way to repair this.

Are there any low-level commands that I can invoke prior to pushing audio to determine that the connection is buggered and that I can utilize to attempt a repair?

Interestingly, if I load first load 100 rooms from my test client, then connect with Blazor test client it works. It only breaks when I am processing audio out and an increased load on the server causes it to hang.

As it stands, this is totally useless if I am unable to recover the audio stream connection on the server side of things.

Aside from not receiving a CaptureAudioFrame event in a timely manner, what other indicators can I look for in this broken connection and what is a good recovery approach for me to reconnect to the broken connection?

Thanks in advance!

jkears commented 2 months ago

The behavior in Linux and Windows services is pretty consistent, in that frame capture is disrupted by the multi threaded, task switching nature of Asp.Net and likely GC is playing a role.

To that end and because our services already are running within Dapr, hosted in a managed Kubernetes environment (Azure Container Apps), I will process the outbound audio track per room processor using the Dapr Actor Model in which each Actor runs in a self-contained unit that receives messages and processes them one at a time in a single thread process, without any kind of concurrency or threading concerns.

Thus when a Room Processor is created via livekit web hook event, I will create an AudioOutboundActor per each Room Processor.

Upon initialization the actor will create its own Ffi client and further create a custom audio source and publish an audio track to the room.

Each sentence returned from a chat completion will be be sequentially processed into its respective AudioOutboundActor instance, where the frame data can be accurately sequenced via capture frame in Rust SDK, without any concurrency, threading or GC type issues.

The reason I think this approach will playback consistently is that when I ran a series of tests in a .NET console app, using the same .Net SDK used in Asp.Net web service, it plays back perfectly, and so I am hopeful the Dapr actor's single threaded process will behave the same.

I will close this as I don't think it's a bug. I'll update this post once I learn more to help others who might be on a similar journey.