snapview / tokio-tungstenite

Future-based Tungstenite for Tokio. Lightweight stream-based WebSocket implementation
MIT License
1.88k stars 236 forks source link

Get client message using channel #239

Closed LimpidCrypto closed 2 years ago

LimpidCrypto commented 2 years ago

I am pretty new to asnyc programming with rust. The example client simply sends incoming messages to stdout. But I wanted to work with each message (e.g. push it to a Vec). Unfortunately you cannot move the message outside the .for_each (at least I did not find a way). So here is a way to receive each message using a channel so that you can do whatever you want with it. Ignore how I treat errors here, it's just to get an idea how it can be done.

use futures_util::{SinkExt, StreamExt};
use tokio::net::TcpStream;
use futures_channel::mpsc::{unbounded, UnboundedSender};
use tokio_tungstenite::{
    tungstenite::protocol::Message, tungstenite::Error, WebSocketStream, MaybeTlsStream, connect_async,
};

struct AsyncWebsocketClient<'a> {
    url: &'a str,
}

impl AsyncWebsocketClient<'static> {
    pub async fn open(&self) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, Error> {
        let ws_stream = connect_async(self.url).await;
        match ws_stream {
            Ok((result, _)) => Ok(result),
            Err(error) => Err(error),
        }
    }

    pub async fn send(&mut self, message: Message, sender: UnboundedSender<Message>) -> Result<(), ()>{
        let ws_stream = self.open().await;
        match ws_stream {
            Ok(mut ws_stream) => {
                let _result = ws_stream.send(message).await;
                while let Some(msg) = ws_stream.next().await {
                    match msg {
                        Ok(message) => {
                            let _ = sender.unbounded_send(message);
                        },
                        Err(_error) => continue,
                    }
                }
                Ok(())
            },
            Err(_) => Err(()),
        }
    }
}

#[tokio::main]
async fn main() {
    let mut client = AsyncWebsocketClient {
        url: "wss://xrplcluster.com/",
    };
    let (sender, mut receiver) = unbounded();
    tokio::spawn(async move {
        while let Some(msg) = receiver.next().await {
            // Do anything with the message.
            println!("{:?}", msg);
        }
    });
    let _send = client.send(
        Message::Text(
            r#"{
                "id": "Example watch for new validated ledgers",
                "command": "subscribe",
                "streams": ["ledger"]
            }"#
            .to_string()
        ),
        sender
    ).await;
}

Also you can ignore that these are struct methods. It should be possible by simply using them as functions and adding a url arg to them.

LimpidCrypto commented 2 years ago

Just an example