livekit / rust-sdks

LiveKit realtime and server SDKs for Rust
https://livekit.io
Apache License 2.0
185 stars 44 forks source link

RoomEvent not being received #407

Closed Thintin914 closed 1 week ago

Thintin914 commented 2 weeks ago

Hello, I'm experiencing something weird in my code.

In fn new_room(), it creates a new room and update the metadata to test1 after 8 seconds. In fn event(), it successfully prints out the RoomMetadataChanged event.

When I type something in my chat bar, it calls fn send_message(). It will update room metadata to test2. It prints "update_room_metadata call succeeded". However, when I prints the room metadata after 2 seconds, it's still the old metadata. And RoomEvent in fn event() is not being received. The same situation goes for RoomService::send_data as well, where send_data from fn new_room() will be received by fn event() but send_data from fn send_message() will not.

But when I leave the room by calling RoomService::remove_participant and join the same room, the metadata is indeed changed to test2. It seems like RoomEvent is not being received from fn send_message().

fn new_room()

    pub fn new_room(&mut self, room_id: String, user_uuid: String, metadata: Option<String>) {
        let room_arc = Arc::clone(&self.room);
        let room_rx_arc = Arc::clone(&self.room_rx);
        let room_service_arc = Arc::clone(&self.room_service);

        std::thread::spawn(move || {
            tokio::runtime::Runtime::new().unwrap().block_on(async {
               **// Create room**
                let (url, api_key, api_secret, https_url) = get_livekit_env();
                let room_service = RoomClient::with_api_key(&https_url, &api_key, &api_secret);

                let _ = room_service
                .create_room(&room_id, CreateRoomOptions {
                    empty_timeout: 30,
                    ..default()
                }).await;

                let (room, rx) = connect(&room_id, &user_uuid, &url, &api_key, &api_secret, true).await;

                *room_arc.lock().unwrap() = Some(room);
                *room_rx_arc.lock().unwrap() = Some(rx);
                *room_service_arc.lock().unwrap() = Some(room_service);

                **// Update room metadata after 8 seconds**
                if let Some(room_service) = room_service_arc.lock().unwrap().as_ref() {
                    std::thread::sleep(std::time::Duration::from_millis(8000));
                    println!("send metadata");
                    let _ = room_service.update_room_metadata(&room_id, "test1").await;
                }

            });
        });

        **// Create a new thread to receive RoomEvent**
        self.event();
    }

fn event()

   **// Print RoomEvent**
    fn event(&mut self) {
        let room_rx_arc = Arc::clone(&self.room_rx);

        std::thread::spawn(move || {
            println!("wait room exist");
            loop {
                if let Some(_) = room_rx_arc.lock().unwrap().as_ref() {
                    break;
                }
                std::thread::sleep(std::time::Duration::from_millis(200));
            };
            println!("start event loop");

            #[allow(clippy::empty_loop)]
            loop {
                if let Some(room_rx) = room_rx_arc.lock().unwrap().as_mut() {
                    if let Ok(room_event) = room_rx.try_recv() {
                        println!("---");
                        match room_event {
                            _ => println!("{:?}", room_event)
                        }
                    }
                }
            }
        });
    }

fn send_message()

   **// update metadata to "test2" on chat button clicked**
    pub fn send_message(&self, room_id: String, topic: String, message: String) {
        let room_arc = Arc::clone(&self.room);
        let room_service_arc = Arc::clone(&self.room_service);

        std::thread::spawn(move || {
            tokio::runtime::Runtime::new().unwrap().block_on(async {  
                if let Some(room_service) = room_service_arc.lock().unwrap().as_ref() {

                    match room_service.update_room_metadata(&room_id, "test2").await {
                        Ok(_) => println!("update_room_metadata call succeeded"),
                        Err(e) => println!("update_room_metadata call failed: {:?}", e),
                    }                                
                    if let Some(room) = room_arc.lock().unwrap().as_ref() {
                        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
                        println!("new metadata: {}", room.metadata());
                    }
                }
            });
        });
    }

I can provide full code if it's possible.

Thintin914 commented 2 weeks ago

Ok I figured out I need to wrap the room_rx inside an async_std::task::block_on loop like the example from basic_async_room. And then I need to send the RoomEvent to main thread with work-stealing. Using std::thread::spawn won't receive the RoomEvent. Kinda confusing behavior.

async fn event(mut room_rx: UnboundedReceiver<RoomEvent>, stop_room_receiver: Receiver<bool>, room_data_sender: Sender<livekit::RoomEvent>) {
    async_std::task::block_on(async {
        loop {
            if let Ok(stop) = stop_room_receiver.try_recv() {
                if stop {
                    drop(stop_room_receiver);
                    drop(room_data_sender);    
                    println!("stop loop");
                    break;
                }
            }

            if let Ok(room_event) = room_rx.try_recv() {
                let _ = room_data_sender.send(room_event);
            }
        }
    });
}