snapview / tokio-tungstenite

Future-based Tungstenite for Tokio. Lightweight stream-based WebSocket implementation
MIT License
1.88k stars 236 forks source link

Not able to detect websocket close consistently #354

Closed devsheke closed 2 weeks ago

devsheke commented 2 weeks ago

Hi, I have been having issues with consistently detecting websocket close notifications sent from the server. In addition to thiss, the messages are still being forwarded to the write_stream even though the connection has been closed. Due this issue I am not able to reliably reconnect to the websocket server. Any help regarding this issue would be highly appreciated :)

Here is the source file.

relevant part of fn read_stream:

        let data = match raw_message {
            Ok(d) => match d {
                TungsteniteMessage::Text(t) => {
                    WSMessenger::pong(outbound_tx.clone()).unwrap_or_else(|err| {
                        error!(
                            msg = "failed to send pong to server",
                            err = format!("{err}")
                        );
                    });

                    t
                }
                TungsteniteMessage::Ping(_) | TungsteniteMessage::Pong(_) => {
                    if let Err(err) = WSMessenger::pong(outbound_tx.clone()) {
                        error!(
                            msg = "failed to send pong to server",
                            err = format!("{err}")
                        );
                    }
                    continue;
                }
                TungsteniteMessage::Close(_) => {
                    warn!(msg = "socket connection closed");
                    if let Err(err) = inbound_tx.send(Message::new_messenger_disconnect()) {
                        error!(
                            msg = "failed to send messenger disconnect notification",
                            err = format!("{err}")
                        );
                    }
                    return;
                }
                _ => continue,
            },
            Err(err) => {
                match err {
                    tokio_tungstenite::tungstenite::Error::ConnectionClosed
                    | tokio_tungstenite::tungstenite::Error::AlreadyClosed => {
                        warn!(msg = "socket connection closed already");
                        if let Err(err) = inbound_tx.send(Message::new_messenger_disconnect()) {
                            error!(
                                msg = "failed to send messenger disconnect notification",
                                err = format!("{err}")
                            );
                        }
                        return;
                    }
                    _ => {
                        error!(msg = "socket read err", err = format!("{err}"));
                        continue;
                    }
                };
            }
        };

fn connect:

        async fn connect(
        &mut self,
    ) -> Result<crossbeam_channel::Sender<Message>, Box<dyn std::error::Error>> {
        info!(msg = "establishing websocket connection");
        let (ws_stream, _) = connect_async(&self.url).await?;

        let (tx, rx) = futures_channel::mpsc::unbounded();
        self.outbound_tx = Some(tx.clone());
        let inbound_tx = self.inbound_tx.clone();
        let _tx = tx.clone();

        let (write, read) = ws_stream.split();
        let write_stream = rx.map(Ok).forward(write);
        let read_stream = tokio::spawn(read_stream(read, _tx, inbound_tx.clone()));

        tokio::spawn(async move {
            pin_mut!(write_stream, read_stream);
            future::select(write_stream, read_stream).await;
            if let Err(err) = inbound_tx.send(Message::new_messenger_disconnect()) {
                error!(
                    msg = "failed to send messenger disconnect notification",
                    err = format!("{err}")
                );
            }
        });

        Ok(self.inbound_tx.clone())
    }
daniel-abramov commented 2 weeks ago

I have been having issues with consistently detecting websocket close notifications sent from the server.

If by close notification you mean the server-initiated WebSocket close (the message the server sends to the client), then you only need to monitor the close message (which you do in your code). Note that the close message sent by the server does not signal the fact that the connection is closed but only informs you that the server-initiated closing of the WebSocket connection - at this point, you typically don't drop the task on the client side. Normally you continue using the stream until the error occurs (typically ConnectionClosed) or until the stream returns None (which happens after the connection is closed).

Simply put, if I understood your use case correctly, you probably do not need to monitor the close message. You can just poll the stream until None is returned or until you discover any error (Some(Err(..))). Your current code would end up in an endless loop if the connection closes due to an unexpected error.

Hope this helps!