y-crdt / yrs-warp

Yrs web socket data exchange protocol implementation for tokio warp server
Other
16 stars 6 forks source link

BroadcastGroup according to room name/id #15

Closed jkbstrmen closed 1 year ago

jkbstrmen commented 1 year ago

Hi,

is there a way how to get an already open BroadcastGroup by room name or create new BroadcastGroup if not yet open for room at runtime? I tried to provide Arc<Mutex> for warp filter, but I cannot create new BroadcastGroup and add it to context, in ws_handler because BroadcastGroup::new() is async.

I was thinking about something like this (pseudocode):

async fn ws_handler(room_id: String, ws: Ws, context: Arc<Mutex<SomeKindOfContext>>) -> Result<impl Reply, Rejection> {
    let context_locked = context.lock().unwrap();
    let room_broadcast = context_locked.get_broadcast_for_room_if(room_id);
    let bcast;
    if let Some(broadcast) = room_broadcast {
        bcast = broadcast;
    } else {
        bcast = create_broadcast(context_locked, room_id);
    }
    Ok(ws.on_upgrade(move |socket| peer(socket, bcast)))
}

Thanks for any suggestions.

Horusiath commented 1 year ago

Wouldn't something like that work?

let rooms = Arc::new(Mutex::new(HashMap::new()));

// create or return room
let room = {
    let mut rooms = rooms.lock().await;
    match rooms.entry(room_id) {
        Entry::Vacant(e) => {
            // establish a new broadcast group
            let awareness = Arc::new(RwLock::new(Awareness::default()));
            let group = Arc::new(BroadcastGroup::new(awareness, BUFFER_CAP).await);
            e.insert(group).clone()
        }
        Entry::Occupied(e) => e.get().clone(), // return existing broadcast group
    }
};
jkbstrmen commented 1 year ago

Thanks for your answer. That is exactly what I was trying to do. My problem was usage of std::sync::Mutex instead of tokio::sync::Mutex there, which was throwing compile time error future cannot be sent between threads safely.

Great work here by the way. Thanks for that as well.