snapview / tokio-tungstenite

Future-based Tungstenite for Tokio. Lightweight stream-based WebSocket implementation
MIT License
1.88k stars 236 forks source link

ECHO SERVER EXAMPLE HElp: Send String instead of forwarding the message #253

Closed badonyt closed 1 year ago

badonyt commented 1 year ago

Hey i was trying tokio-tugnestite

and i got this echo server(example):

// src/main.rs
use std::{env, io::Error};

use futures_util::StreamExt;
use log::info;
use tokio::net::{TcpListener, TcpStream};

#[tokio::main]
async fn main() -> Result<(), Error> {
    let _ = env_logger::try_init();
    let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());

    // Create the event loop and TCP listener we'll accept connections on.
    let try_socket = TcpListener::bind(&addr).await;
    let listener = try_socket.expect("Failed to bind");
    info!("Listening on: {}", addr);

    while let Ok((stream, _)) = listener.accept().await {
        tokio::spawn(accept_connection(stream));
    }

    Ok(())
}

async fn accept_connection(stream: TcpStream) {
    let addr = stream.peer_addr().expect("connected streams should have a peer address");
    info!("Peer address: {}", addr);

    let ws_stream = tokio_tungstenite::accept_async(stream)
        .await
        .expect("Error during the websocket handshake occurred");

    info!("New WebSocket connection: {}", addr);

    let (write, read) = ws_stream.split();
    read.forward(write).await.expect("Failed to forward message")
}

in this line read.forward(write).await.expect("Failed to forward message") instead of sending the forwarded message i send a custom string, how can i do that?

badonyt commented 1 year ago

How can i even get what he sent?

daniel-abramov commented 1 year ago

You have the read and write counterparts. The read counterpart could be used to read messages (it implements async Stream).

We also have an example that shows how to broadcast a single message to all other connected participants that demonstrates how to read and write messages. And an example of an echo server that also sends messages with an interval to the client.

badonyt commented 1 year ago

Hey using this an example that shows how to broadcast a single message to all other connected participants

it gives me this errors when i run image image image

dependecies


[dependencies]
futures-channel = "0.3.25"
futures-util = "0.3.25"
tokio = "1.0"
tokio-tungstenite = "0.13.0"
tungstenite = "0.13.0"
daniel-abramov commented 1 year ago

You're using an ancient version of tokio-tungstenite (and tungstenite) that is at least 2 years old. Please, use the latest version.

Also, you don't need to use tungstenite when tokio-tungstenite is in use, since tokio-tungstenite re-exports the types.

badonyt commented 1 year ago

image with these packages

[dependencies]
futures-channel = "0.3.25"
futures-util = "0.3.25"
tokio = "1.24.2"
tokio-tungstenite = "*"
daniel-abramov commented 1 year ago

You need to enable the features of tokio that you plan to use. There are many of them: https://docs.rs/tokio/latest/tokio/#feature-flags Select those that you intend to use in your application (or enable all if you don't mind or don't know about the API surface that you want to use).

As for the tungstenite::Message, as I mentioned: you may want to use the re-exported types via tokio_tungstenite::tungstenite::Message.

NB: I also would strongly suggest checking the Rust Book to at least understand the basics and getting familiar with Tokio, otherwise you'll find yourself fighting the compiler with each minor simple change.

badonyt commented 1 year ago

i bought the rust book(i know its free, but i prefer the physical)

badonyt commented 1 year ago

hey im new to rust, and this will sound stupid, but how do i enable all the features in the cargo.toml tokio = { version = "1.24.2", default-features = true}, like enable all

daniel-abramov commented 1 year ago

There is generally speaking no way to enable all features at once without enumerating them because features may be mutually exclusive (e.g. TLS support in tokio-tungstenite where we support different TLS backends and the user has a choice depending on their needs).

There is a feature called full in tokio though, it allows to enable all the features.

badonyt commented 1 year ago

youve told me to unasign tungstenite but now its asking image

badonyt commented 1 year ago

my dependecies look like this btw:

[dependencies]
futures-channel = "0.3.25"
futures-util = "0.3.25"
tokio = { version = "1.24.2", features = ["full"] }
tokio-tungstenite = "*"
daniel-abramov commented 1 year ago

https://github.com/snapview/tokio-tungstenite/issues/253#issuecomment-1396114417:

As for the tungstenite::Message, as I mentioned: you may want to use the re-exported types via tokio_tungstenite::tungstenite::Message.

badonyt commented 1 year ago

You have the read and write counterparts. The read counterpart could be used to read messages (it implements async Stream).

We also have an example that shows how to broadcast a single message to all other connected participants that demonstrates how to read and write messages. And an example of an echo server that also sends messages with an interval to the client.

printing write println!("{:?}", write); gives me nonsense image any idea to print wht he sent, also how do i send global messages

badonyt commented 1 year ago

*what

daniel-abramov commented 1 year ago

It gives "nonsense" because you're trying to print the Debug implementation of the sink, not the message. The example I shared in the previous messages has a sample code which prints incoming messages.

I would also like to ask you to please use GitHub issues for discussing/reporting/asking about tungstenite / tokio-tungstenite related affairs. So far all your comments and questions were basic questions about the fundamentals of Rust and Tokio (i.e. they were not related to WebSockets or tungstenite at all). GitHub Issues of library is not the right place (or right format if you will) for such chat-like discussions as it creates a lot of noise (offtopic communication) and sends notifications to those who are subscribed to the repository.

If you want to discuss the basics of Rust or have problems navigating the Rust Book or the documentation, please use the community resources where you could ask for help (though to be honest, I'm pretty sure that after going through the Rust Book and Tokio docs, you'll find all the answers).

However, if you have any issues or questions related to tungstenite, feel free to create one and we'll try to properly classify and address it as soon as we can.

badonyt commented 1 year ago

Ok, sorry for my disturbance. Im reading the book now(it finally arrived(i know its free online, but i prefer the physical)). And i undestood the code. Thanks!

badonyt commented 1 year ago

(code below)ok so this i believe is a valid question, So i want to send a message to all clients all time(loop) but i cant figure it out, 1) ive tried putting in the main function in the while while let Ok((stream, addr)) ..., wasnt always running 2) making seperate loop in the main, i needed the addr from this let Ok((stream, addr)) which i couldnt figure out how to get addr

//! A chat server that broadcasts a message to all connections.
//!
//! This is a simple line-based server which accepts WebSocket connections,
//! reads lines from those connections, and broadcasts the lines to all other
//! connected clients.
//!
//! You can test this out by running:
//!
//!     cargo run --example server 127.0.0.1:12345
//!
//! And then in another window run:
//!
//!     cargo run --example client ws://127.0.0.1:12345/
//!
//! You can run the second command in multiple windows and then chat between the
//! two, seeing the messages from the other client as they're received. For all
//! connected clients they'll all join the same room and see everyone else's
//! messages.

use std::{
    collections::HashMap,
    env,
    io::Error as IoError,
    net::SocketAddr,
    sync::{Arc, Mutex},
};

use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};

use tokio::net::{TcpListener, TcpStream};

use tokio_tungstenite::tungstenite::protocol::Message;
type Tx = UnboundedSender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;

async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, addr: SocketAddr) {
    println!("Incoming TCP connection from: {}", addr);

    let ws_stream = tokio_tungstenite::accept_async(raw_stream)
        .await
        .expect("Error during the websocket handshake occurred");
    println!("WebSocket connection established: {}", addr);

    // Insert the write part of this peer to the peer map.
    let (tx, rx) = unbounded();
    peer_map.lock().unwrap().insert(addr, tx);

    let (outgoing, incoming) = ws_stream.split();

    let broadcast_incoming = incoming.try_for_each(|msg| {
        println!("Received a message from {}: {}", addr, msg.to_text().unwrap());
        let peers = peer_map.lock().unwrap();

        // We want to broadcast the message to everyone except ourselves.
        let broadcast_recipients =
            peers.iter().map(|(_, ws_sink)| ws_sink);

        for recp in broadcast_recipients {
            recp.unbounded_send("well".into()).unwrap();
        }

        future::ok(())
    });

    let receive_from_others = rx.map(Ok).forward(outgoing);

    pin_mut!(broadcast_incoming, receive_from_others);
    future::select(broadcast_incoming, receive_from_others).await;

    println!("{} disconnected", &addr);
    peer_map.lock().unwrap().remove(&addr);
}

#[tokio::main]
async fn main() -> Result<(), IoError> {
    let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());

    let state = PeerMap::new(Mutex::new(HashMap::new()));

    // Create the event loop and TCP listener we'll accept connections on.
    let try_socket = TcpListener::bind(&addr).await;
    let listener = try_socket.expect("Failed to bind");
    println!("Listening on: {}", addr);

    // Let's spawn the handling of each connection in a separate task.
    while let Ok((stream, addr)) = listener.accept().await {
        tokio::spawn(handle_connection(state.clone(), stream, addr));

    }

    Ok(())
}
badonyt commented 1 year ago

what i mean constantly i mean by always sending a message