y-crdt / yrs-warp

Yrs web socket data exchange protocol implementation for tokio warp server
Other
16 stars 6 forks source link

Multi-peer broadcasting failing with BroadcastGroup example #19

Open pkpbynum opened 1 week ago

pkpbynum commented 1 week ago

Hi there, I have a setup running this exact example from the README with 2 clients using the WebsocketProvider from y-websocket, but no updates are broadcast across peers. After some digging, I discovered that all updates are marked as pending, and do not invoke this document observe_update_v1 callback. Then I discovered that the Yjs protocol is actually implemented by WarpConn and the callback fires when using it, but this PR suggests that the WarpConn is no longer necessary.

To my eyes it seems like the BroadcastGroup should be firing SyncStep1 in the subscribe methods in order to finalize the connection. I think another issue is that there are many duplicate dependencies across yrs, y-sync, and yrs-warp (e.g. BroadcastGroup), and it's difficult to tell which I should be using.

Is it possible to get a working example which actually broadcasts both awareness and document updates across multiple peers using this library? Apologies if I'm missing something, or if this library is not actually ready for use.

code is here (with imports)

use std::sync::Arc;

use futures_util::StreamExt;
use tokio::sync::{Mutex, RwLock};
use warp::{
    filters::ws::{WebSocket, Ws},
    reject::Rejection,
    reply::Reply,
    Filter,
};
use y_sync::{awareness::Awareness, net::BroadcastGroup};
use yrs::Doc;
use yrs_warp::ws::{WarpSink, WarpStream};

#[tokio::main]
async fn main() {
    // We're using a single static document shared among all the peers.
    let awareness = Arc::new(RwLock::new(Awareness::new(Doc::new())));

    // open a broadcast group that listens to awareness and document updates
    // and has a pending message buffer of up to 32 updates
    let bcast = Arc::new(BroadcastGroup::new(awareness, 32).await);

    let ws = warp::path("my-room")
        .and(warp::ws())
        .and(warp::any().map(move || bcast.clone()))
        .and_then(ws_handler);

    warp::serve(ws).run(([0, 0, 0, 0], 8000)).await;
}

async fn ws_handler(ws: Ws, bcast: Arc<BroadcastGroup>) -> Result<impl Reply, Rejection> {
    Ok(ws.on_upgrade(move |socket| peer(socket, bcast)))
}

async fn peer(ws: WebSocket, bcast: Arc<BroadcastGroup>) {
    let (sink, stream) = ws.split();
    let sink = Arc::new(Mutex::new(WarpSink::from(sink)));
    let stream = WarpStream::from(stream);
    let sub = bcast.subscribe(sink, stream);
    match sub.completed().await {
        Ok(_) => println!("broadcasting for channel finished successfully"),
        Err(e) => eprintln!("broadcasting for channel finished abruptly: {}", e),
    }
}
Horusiath commented 1 week ago

Thanks @pkpbynum I'll take a look at it.

pkpbynum commented 6 days ago

This diff fixes it for me, but looks like I don't have permissions to push to a branch. Also seems like this would also be a bug in y-sync, since it also has the BroadcastGroup?

diff --git a/src/broadcast.rs b/src/broadcast.rs
index 54d60ff..0471a37 100644
--- a/src/broadcast.rs
+++ b/src/broadcast.rs
@@ -169,6 +169,16 @@ impl BroadcastGroup {
         let stream_task = {
             let awareness = self.awareness().clone();
             tokio::spawn(async move {
+                let payload = {
+                    let mut encoder = EncoderV1::new();
+                    let awareness = awareness.read().await;
+                    protocol.start(&awareness, &mut encoder)?;
+                    encoder.to_vec()
+                };
+                if !payload.is_empty() {
+                    let mut s = sink.lock().await;
+                    s.send(payload).await.map_err(|e| Error::Other(e.into()))?;
+                }
                 while let Some(res) = stream.next().await {
                     let msg = Message::decode_v1(&res.map_err(|e| Error::Other(Box::new(e)))?)?;
                     let reply = Self::handle_msg(&protocol, &awareness, msg).await?;