websockets-rs / rust-websocket

A WebSocket (RFC6455) library written in Rust
http://websockets-rs.github.io/rust-websocket/
MIT License
1.52k stars 224 forks source link

Async server use needs more documentation #170

Open anlumo opened 6 years ago

anlumo commented 6 years ago

Hello,

I'm moderately experienced with Rust and trying to get an async websocket server to run using this crate. I'm really having a hard time wrapping my head around this, and there's no documentation for this other than a single far too simplistic example and the reference documentation that's nearly unreadable due to all of the async stuff using types that go all over the place. The error messages I get from rustc are larger than my screen and remind me of my heavily templated C++ past.

For example, even the most basic thing of sending a text message via the websocket is not really explained anywhere.

What I found out about the example code is this:

stream.take_while(|m| Ok(!m.is_close()))

This is like the beginning of an async while loop.

.filter_map(|m| {

Receive one message from the websocket, and optionally transform the object to something else (why?).

match m {
    OwnedMessage::Ping(p) => Some(OwnedMessage::Pong(p)),
    OwnedMessage::Pong(_) => None,
    _ => Some(m),
}

Convert ping to pong, pong to not sending anything and pass along anything else. I don't see anything here actually sending a message, though. This just generates packets.

.forward(sink)

This I assume is the end of the while loop (I'm not 100% sure, that's just a hunch). I have no idea where that OwnedMessage goes to that's generated in the previous async block. It also doesn't explain how to send a message that's not a direct response to a message on the websocket.

.and_then(|(_, sink)| {sink.send(OwnedMessage::Close(None))})

When the take_while/forward-loop is done, close the socket.

I can't believe I'm the only one who has problems wrapping his head around this undocumented code. I've also been unable to find any source code online that's using this crate to create an async websocket server, which might be related to this issue.

Could you please write up some documentation and maybe even write a more thorough example? A simple chat server for example, where you can send and receive messages in a group chat.

Aerolivier commented 6 years ago

Yes, I am having the exact same problems.

I've had to read into Tokio and now have a vague-ish understanding of how it works.

Sending messages in direct response to incoming messages seems simple enough, but sending delayed or spontaneous messages isn't going so well. I'm trying to piece together the async-server example with this Tokio example: https://github.com/tokio-rs/tokio/blob/master/examples/chat.rs , but I'm still running into problems resulting in type errors of 1400 chars in length, I think related to the error type. (But with such long type errors, it's hard to interpret just what I exactly need to change)

The technique seems to be to use a Tokio mpsc channel pair and attach it to each socket, but I'm still scratching my head for now — a documented example would be extremely useful, and much appreciated, if anyone can figure out how to do it..

anlumo commented 6 years ago

You sometimes need to get rid of error types (convert them to ()) via map_err.

anlumo commented 6 years ago

I think I've conquered the websocket futures. The trick is to create a channel and link it to the websocket like this:

let f = upgrade.accept().and_then(move |(s, _)| {
  let (tx, rx) = mpsc::channel(8);
  let (sink, stream) = s.split();

  let writer = rx.map_err(|_| { WebSocketError::ProtocolError("Can't happen") }).forward(sink).and_then(|(_, sink)| {
      sink.send(OwnedMessage::Close(None))
  }).map_err(|err| {
      error!("{}", err);
  }).map(|_| {});
  handle_inner.spawn(writer);

  stream.take_while(|m| Ok(!m.is_close()))
      .for_each(move |m| {
          let tx_inner = tx.clone();
          println!("Incoming message: {:?}", m);
          // handle message here
          Ok(())
      })
});
spawn_future(f, "Websocket", &handle);

Now, to send a message to that websocket from anywhere, you need a reference to tx and handle (both have a clone() function, so you can pass them around everywhere) and do this:

handle_inner.spawn(tx_inner.send(OwnedMessage::Text("OK!".to_string())).map_err(|err| {
    error!("{}", err);
}).map(|_| {}));

You have to consume both the error (WebsocketError) and the regular result (which is a useless Sink), because spawn expects a Future with both Item and Error as ().

Note that for proper socket closing, all tx clones have to go out of scope once the Future created from stream is finished.