szarykott / signalrs

SignalR protocol implementation in Rust
25 stars 13 forks source link

How to convert TcpStream into connection for Server? #11

Open brandonros opened 10 months ago

brandonros commented 10 months ago

example from similar websocket code unrelated to signalrs:

pub async fn start(addr: &str, message_handler: Arc<Box<dyn MessageHandler>>) -> Result<(), Box<dyn Error>> {
        let listener = TcpListener::bind(&addr).await?;
        log::info!("Listening on: {addr}");
        while let Ok((client_stream, socket_addr)) = listener.accept().await {
            let message_handler_clone = message_handler.clone();
            async_std::task::spawn(async move {
                WebSocketServer::handle_client(client_stream, socket_addr, message_handler_clone).await;
            });
        }
        unreachable!()
    }

is the signalrs-next Server + Hub implementation ready to support something like this, or is the idea to abstract away the WebSocket part and feed it channels? Even an example showing that from a TcpStream might be interesting/helpful for myself/others.

brandonros commented 10 months ago
use std::{error::Error, sync::Arc};

use async_std::net::TcpListener;
use async_tungstenite::tungstenite::Message;
use flume::unbounded;
use futures::{StreamExt, TryStreamExt};
use signalrs_next::{server::{hub::builder::HubBuilder, Server, response::ResponseSink}, protocol::Invocation};

async fn add_handler(a: i32, b: i32) -> i32 {
    a + b
}

async fn start(addr: &str) -> Result<(), Box<dyn Error>> {
    let signalr_hub = HubBuilder::new().method("add", add_handler).build();
    let signalr_server = Server::from(signalr_hub);
    let signalr_server = Arc::new(signalr_server);
    let listener = TcpListener::bind(&addr).await?;
    log::info!("Listening on: {addr}");
    while let Ok((client_tcp_stream, socket_addr)) = listener.accept().await {
        let signalr_server = signalr_server.clone();
        async_std::task::spawn(async move {
            let client_ws_stream = async_tungstenite::accept_async(client_tcp_stream)
                .await
                .expect("Error during the websocket handshake occurred");
            let (client_sink, mut client_stream) = client_ws_stream.split();
            // rx from client
            let incoming_future = Box::pin(async move {
                loop {
                    match client_stream.try_next().await {
                        Ok(msg) => match msg {
                            Some(msg) => match msg {
                                Message::Text(msg) => {
                                    // TODO: handle msg -> arguments
                                    let (tx, rx) = unbounded();
                                    let invocation = Invocation::with_id("123", "add", Some((1i32, 2i32)));
                                    let response_sink = ResponseSink::new(tx.into_sink());
                                    signalr_server
                                        .invoke_text(serde_json::to_string(&invocation).unwrap(), Default::default(), response_sink)
                                        .await
                                        .unwrap();
                                    let hub_response = rx.recv_async().await.unwrap();
                                    let text = hub_response.unwrap_text();
                                    // TODO: handle text -> response?
                                },
                                Message::Binary(_) => todo!(),
                                Message::Ping(_) => todo!(),
                                Message::Pong(_) => todo!(),
                                Message::Close(_) => todo!(),
                                Message::Frame(_) => todo!(),
                            },
                            None => {
                                log::error!("received None");
                                break;
                            }
                        },
                        Err(err) => {
                            log::error!("{err}");
                            break;
                        }
                    }
                }
            });
            // rx from channel -> tx to client
            let outgoing_future = Box::pin(async move {});
            // wait on both futures in parallel
            futures::future::select(outgoing_future, incoming_future).await;
        });
    }
    unreachable!()
}

I think this is what we want? Did I do something wrong?

szarykott commented 10 months ago

Whooah!

I am thrilled that somebody took a look at signalrs-next! However, this crate is not published and I am afraid that it is a bit rusty. I do not plan to release it in the foreseeable future. However, if you plan to use it in one form or another, I can provide some guidance or at least tell you if this is possible or not.