snapview / tokio-tungstenite

Future-based Tungstenite for Tokio. Lightweight stream-based WebSocket implementation
MIT License
1.88k stars 236 forks source link

Possible to await connection open and then keep the connection open in another Thread? #264

Closed your-sudden-death closed 1 year ago

your-sudden-death commented 1 year ago

Hi everyone, do you know if there is a way to open the connection in blocking mode using await, catch Errors in the Connection Process, and the handoff the connection to be held open by a async thread (e.g. tokio::spawn) while receiving messages on a callback function? My first try was doing it like this:

/// Connect to Mailbox if not connected, received messages are passed to the callback
pub async fn connect<F>(
    &mut self,
    received_message_callback: F,
    timeout: Duration,
) -> Result<(), LibPolyError>
where
    F: Fn(Message) -> (),
{
    match self.connection_state {
        ConnectionState::Disconnected {
            last_connection_time: _,
        } => {
            let mut root_store = rustls::RootCertStore::empty();
            root_store.add_server_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.0.iter().map(
                |ta| {
                    rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
                        ta.subject,
                        ta.spki,
                        ta.name_constraints,
                    )
                },
            ));
            let config = match &self.trusted_certificate {
                Some(trusted_certificate) => ClientConfig::builder()
                    .with_safe_defaults()
                    .with_custom_certificate_verifier(Arc::new(SelfSignedVerifier {
                        trusted_certificate: trusted_certificate.clone(),
                        pki_verifier: WebPkiVerifier::new(root_store, None),
                    }))
                    .with_no_client_auth(),
                None => ClientConfig::builder()
                    .with_safe_defaults()
                    .with_root_certificates(root_store)
                    .with_no_client_auth(),
            };
            let arc_config = Arc::new(config);

            let maybe_future_stream = timeout_at(
                Instant::now() + timeout,
                connect_async_tls_with_config(
                    self.url.clone(),
                    None,
                    Some(Connector::Rustls(arc_config.to_owned())),
                ),
            )
            .await;

            let (ws_stream, _) = match maybe_future_stream {
                Ok(i) => match i {
                    Ok(v) => v,
                    Err(e) => {
                        info!("failed to connect to {}, {:?}", self.url.to_string(), e);
                        return Err(LibPolyError::ConnectionError(
                            self.url.to_string(),
                            e.to_string(),
                        ));
                    }
                },
                Err(e) => {
                    info!(
                        "timed out while trying to reach {}: {:?}",
                        self.url.to_string(),
                        e
                    );
                    return Err(LibPolyError::HostUnreachable(
                        self.url.to_string(),
                        e.to_string(),
                    ));
                }
            };

            let (tx, rx) = futures_channel::mpsc::unbounded();
            let (write, read) = ws_stream.split();

            let rx_to_ws = rx.map(Ok).forward(write);
            let ws_to_tx = {
                read.for_each(|message| async {
                    received_message_callback(message.unwrap());
                })
            };

            pin_mut!(rx_to_ws, ws_to_tx);

            self.connection_state = ConnectionState::Connected { sink: tx };
            info!("connected established to {}", self.url.to_string());

            let _ = future::select(rx_to_ws, ws_to_tx).then(|_| async move {
                info!("closed connection to {}", self.url.to_string());
                self.connection_state = ConnectionState::Disconnected {
                    last_connection_time: Some(current_time_millis()),
                };
            });

            return Ok(());
        }
        _ => return Ok(()),
    };
}

but this causes the connection to be dropped immediately, and the .then block is never executed If I do it how it is described in the client example, the call needs to await the select and block until the connection is closed, which is suboptimal.

Thanks in advance

daniel-abramov commented 1 year ago

do you know if there is a way to open the connection in blocking mode using await, catch Errors in the Connection Process, and the handoff the connection to be held open by a async thread (e.g. tokio::spawn) while receiving messages on a callback function?

I don't understand what you're trying to achieve. tokio-tungstenite` is not "blocking", it's future-based. When you're trying to connect, you're getting a future back that you're supposed to poll. The future resolves once the connection is established, and then you could process the messages in a way you want (whether you need a callback or not).

The code that you posted is unfortunately overly complicated as the majority of its parts are not related to the issue you're describing and only a small section of it relates to tokio-tungstenite. In that branch you create a select future and drop the result of this future right away (let _ = ...), instead of awaiting on it. That's the reason why it behaves the way you described ("but this causes the connection to be dropped immediately").

In other words, if you want the select future to do some progress, you need to await on it, instead of dropping it right away.

For more details of why this is important, please check the Tokio docs: https://tokio.rs/tokio/tutorial They have comprehensive tutorials on how to organise the code to properly work with futures.