snapview / tokio-tungstenite

Future-based Tungstenite for Tokio. Lightweight stream-based WebSocket implementation
MIT License
1.88k stars 236 forks source link

Question: Correctly Handling Reconnection #290

Closed Mictat closed 1 year ago

Mictat commented 1 year ago

Issue I've been having trouble with a particular server, and managing to correctly reconnect when messages are out of sync with the server. When I attempt to reconnect, the server often (not always) continues to reply with updates instead of a fresh snapshot.

Context This particular server requires that I send an initial message to authenticate, and then replies with a snapshot, followed by updates. Naturally, when a new update fails validation (so we're out of sync), I send a close frame, wait to receive a close frame from the server, and then drop my message handler (and therefore the stream and sink). I then create a new connection, and spawn a new message handler.

My issue is that when attempting to reconnect, the server often continues to reply with updates (instead of a fresh snapshot). I've tried reconnecting using python and it seems to work fine, so I'm inclined to think it's my implementation that's the issue (especially given my inexperience in Rust).

Code I am aware that I don't have to explicitly handle the ConnectionClosed error, because it's never thrown.

/// Connects to server with retry logic and spawns tasks to manage the incoming and outgoing messages (released on reset).
/// Also sends on-init message. Waits for reset signal to re-connect.
async fn handle_connection<F1>(
    get_handshake_info: F1,
    on_init_msg: Option<Message>,
    update_tx: Sender<String>, // channel to process incoming messages
    msg_tx: Sender<Message>, // channel to send message to the server
    mut reset_rx: Receiver<()>, // reset event receiver
    reset_tx: Sender<()>,  // reset event sender
) where
    F1: Fn() -> HandshakeInfo + Send + 'static,
{
    loop {
        let handshake_info = get_handshake_info();
        let split = connect_ws_retry(&handshake_info, 5).await.unwrap();

        // spawn thread to handle incoming messages
        tokio::spawn(handle_messages(
            split.read,
            update_tx.clone(),
            reset_tx.clone(),
        ));

        // spawn thread to handle outgoing messages
        tokio::spawn(send_messages(split.write, msg_tx.subscribe()));

        if let Some(msg) = &on_init_msg {
            // send on-init message
            msg_tx.send(msg.clone()).unwrap();
        }

        // wait to reset connection
        reset_rx.recv().await.unwrap();
    }
}
pub type WsRead = futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>>;

/// Handle incoming messages. On recieving a close message the reset signal is fired.
async fn handle_messages(mut read: WsRead, update_tx: Sender<String>, reset_tx: Sender<()>) {
    // read from server, send along channel
    loop {
        if let Some(message) = read.next().await {
            match message {
                Ok(message) => match message {
                    Message::Text(text) => {
                        // println!("Message: {}", text.chars().take(50).collect::<String>());
                        update_tx.send(text).unwrap();
                    }
                    Message::Close(_) => {
                        println!("Close message received! Sending reset signal down channel...");
                        reset_tx.send(()).unwrap();
                        break; // release this thread, which should also drop senders
                    }
                    _ => (),
                },
                Err(err) => match err {
                    Error::ConnectionClosed => {
                        println!("Connection closed error!");
                        break; // release this thread, which should also drop senders
                    }
                    _ => (),
                },
            }
        }
    }
}

Question I'm not sure if this is a server specific error, or if this is an issue with my implementation. Is this a reasonable way to handle reconnection to a server? Is there any reason why I might not be dropping my connection correctly before reconnecting?

Any help or tips would be greatly appreciated!

sandipndev commented 6 months ago

Hi, did you get a resolution?