Totodore / socketioxide

A socket.io server implementation in Rust that integrates with the Tower ecosystem and the Tokio stack.
https://docs.rs/socketioxide
MIT License
1.18k stars 49 forks source link

Migrate Signaling Socket.io JS into Socketioxide Rust #347

Closed aryabp closed 1 month ago

aryabp commented 1 month ago

DOMException: Failed to execute 'createAnswer' on 'RTCPeerConnection': PeerConnection cannot create an answer in a state other than have-remote-offer or have-local-pranswer.

i found this error when using createAnswer(), but when i using JS Socket.io server it's run well

Ref link : https://acidtango.com/thelemoncrunch/how-to-implement-a-video-conference-with-webrtc-and-node/

Totodore commented 1 month ago

Could you add a debug log of socketioxide please? You have to enable the tracing feature in socketioxide and set the log level to debug.

aryabp commented 1 month ago

there is no errors in socketioxide code, but the errors occurs in socket.io client code

here some snippets

main.rs

mod state;

use std::sync::{Arc, Mutex};

use axum::routing::get;
use serde::{Deserialize, Serialize};

use socketioxide::{
    extract::{Data, SocketRef, State},
    SocketIo,
};
use tokio::net::TcpListener;
use tower::ServiceBuilder;
use tower_http::cors::CorsLayer;
use tracing::info;
use tracing_subscriber::FmtSubscriber;

////////Real-time chat

#[derive(Debug, Deserialize)]
struct MessageIn {
    room: String,
    text: String,
}

#[derive(Serialize)]
struct Messages {
    messages: Vec<state::Message>,
}

////////////Signaling

type RoomCounterStore = Arc<Mutex<Vec<String>>>;
#[derive(Serialize, Deserialize, Clone, Debug)]
struct SDP {
    sdp: String,
    #[serde(rename = "type")]
    r#type: String,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
struct WebRTC {
    #[serde(rename = "type")]
    r#type: String,
    sdp: SDP,
    room_id: String,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct ICE {
    room_id: String,
    label: i32,
    candidate: String,
}

async fn on_connect(socket: SocketRef) {
    info!("socket connected: {}", socket.id);

    ///////////Real-time chat

    socket.on(
        "joinchat",
        |socket: SocketRef, Data::<String>(room), store: State<state::MessageStore>| async move {
            info!("Received joinchat: {:?}", room);
            let _ = socket.leave_all();
            let _ = socket.join(room.clone());
            let messages = store.get(&room).await;
            let _ = socket.emit("messages", Messages { messages });
        },
    );
    socket.on(
        "messagechat",
        |socket: SocketRef, Data::<MessageIn>(data), store: State<state::MessageStore>| async move {
            info!("Received messagechat: {:?}", data);

            let response = state::Message {
                text: data.text,
                user: format!("anon-{}", socket.id),
                date: chrono::Utc::now(),
            };

            store.insert(&data.room, response.clone()).await;

            let _ = socket.within(data.room).emit("messagechat", response);
        },
    );
}
async fn on_signal(socket: SocketRef) {
    info!("socket connected: {}", socket.id);

    ///////////Signaling

    socket.on(
        "join",
        |socket: SocketRef, Data::<String>(room_id), count: State<RoomCounterStore>| async move {
            match count.lock() {
                Ok(mut e) => {
                    let x: usize = e.iter().filter(|x| **x == room_id).count();
                    if x == 0 {
                        info!(
                            "Creating room {:?} and emitting room_created socket event",
                            room_id
                        );
                        e.push(room_id.clone());
                        let _ = socket.leave_all();
                        let _ = socket.join(room_id.clone());
                        let _ = socket.emit("room_created", room_id);
                    } else if x == 1 {
                        info!(
                            "Joining room : {:?} and emitting room_joined socket event",
                            room_id
                        );
                        e.push(room_id.clone());
                        let _ = socket.leave_all();
                        let _ = socket.join(room_id.clone());
                        let _ = socket.emit("room_joined", room_id);
                    } else {
                        info!(
                            "Can't join room {:?}, emitting full_room socket event",
                            room_id
                        );
                        let _ = socket.emit("full_room", room_id);
                    }
                }
                Err(_) => {}
            }
        },
    );
    socket.on(
        "start_call",
        |socket: SocketRef, Data::<String>(room_id)| async move {
            info!(
                "Broadcasting start_call event to peers in room {:?}",
                room_id
            );
            let _ = socket
                .broadcast()
                .to(room_id)
                .emit("start_call", "whatever");
        },
    );
    socket.on(
        "webrtc_offer",
        |socket: SocketRef, Data::<WebRTC>(webrtc_sdp)| async move {
            info!(
                "Broadcasting webrtc_offer event to peers in room {:?}",
                webrtc_sdp.room_id
            );
            let _ = socket
                .broadcast()
                .to(webrtc_sdp.room_id)
                .emit("webrtc_offer", webrtc_sdp.sdp);
        },
    );
    socket.on(
        "webrtc_answer",
        |socket: SocketRef, Data::<WebRTC>(webrtc_sdp)| async move {
            info!(
                "Broadcasting webrtc_answer event to peers in room {:?}",
                webrtc_sdp.room_id
            );
            let _ = socket
                .broadcast()
                .to(webrtc_sdp.room_id)
                .emit("webrtc_answer", webrtc_sdp.sdp);
        },
    );
    socket.on(
        "webrtc_ice_candidate",
        |socket: SocketRef, Data::<ICE>(ice)| async move {
            let room_id = ice.room_id.clone();
            info!(
                "Broadcasting webrtc_ice_candidate event to peers in room {:?}",
                room_id
            );
            let _ = socket.broadcast().to(room_id).emit("webrtc_offer", ice);
        },
    );
}

async fn handler(axum::extract::State(io): axum::extract::State<SocketIo>) {
    info!("handler called");
    let _ = io.emit("hello", "world");
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing::subscriber::set_global_default(FmtSubscriber::default())?;

    let messages = state::MessageStore::default();
    let roomcount = RoomCounterStore::default();
    let (layer, io) = SocketIo::builder()
        .with_state(messages)
        .with_state(roomcount)
        .build_layer();

    io.ns("/", on_connect);
    io.ns("/signal", on_signal);

    let app = axum::Router::new()
        .route("/", get(|| async { "Hello, World!" }))
        .route("/hello", get(handler))
        .with_state(io)
        .layer(
            ServiceBuilder::new()
                .layer(CorsLayer::permissive())
                .layer(layer),
        );

    info!("Starting server");

    let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();

    Ok(())
}

state.rs

use std::{collections::{HashMap, VecDeque}, sync::Arc};
use tokio::sync::RwLock;

#[derive(serde::Serialize, Clone, Debug)]
pub struct Message {
    pub text: String,
    pub user: String,
    pub date: chrono::DateTime<chrono::Utc>,
}

pub type RoomStore = HashMap<String, VecDeque<Message>>;

#[derive(Clone, Default)]
pub struct MessageStore {
    pub messages: Arc<RwLock<RoomStore>>,
}

impl MessageStore {
    pub async fn insert(&self, room: &str, message: Message) {
        let mut binding = self.messages.write().await;
        let messages = binding.entry(room.to_owned()).or_default();
        messages.push_front(message);
        messages.truncate(20);
    }

    pub async fn get(&self, room: &str) -> Vec<Message> {
        let messages = self.messages.read().await.get(room).cloned();
        messages.unwrap_or_default().into_iter().rev().collect()
    }
}
aryabp commented 1 month ago

i am sorry if my contribution is like hit & run, but i am in rush with my paper and project, i will continue this thread with debugging em. cya

Totodore commented 1 month ago

@aryabp I have added an example in the repository with the correct implementation. I suspect that you had some issue with room management which would lead to state issue and then bad webrtc calls. The solution I implemented is only the one taken from the article and does not include the messaging part of your code.