websockets-rs / rust-websocket

A WebSocket (RFC6455) library written in Rust
http://websockets-rs.github.io/rust-websocket/
MIT License
1.55k stars 223 forks source link

Add example for async server that is not response based #129

Closed janhohenheim closed 7 years ago

janhohenheim commented 7 years ago

I would love to use the crate.
Unfortunately, all my attempts to modify the example server to send and receive messages independently have failed.
Maybe someone with more experience in these things can help me?

illegalprime commented 7 years ago

I'm at work right now so I'll only be able to give a full example afterwards, but let me try to move you in the right direction, in async-autobahn-client.rs we have:

let test_case = ClientBuilder::new(&url)
    .unwrap()
    .async_connect_insecure(&handle)
    .and_then(move |(duplex, _)| {
        println!("Executing test case: {}/{}", case_id, case_count);
        future::loop_fn(duplex, |stream| {
            stream.into_future()
                .or_else(|(err, stream)| {
                    println!("Could not receive message: {:?}", err);
                    stream.send(OwnedMessage::Close(None)).map(|s| (None, s))
                })
                .and_then(|(msg, stream)| match msg {
                    Some(OwnedMessage::Text(txt)) => {
                        stream.send(OwnedMessage::Text(txt))
                              .map(|s| Loop::Continue(s))
                              .boxed()
                    }
                    Some(OwnedMessage::Binary(bin)) => {
                        stream.send(OwnedMessage::Binary(bin))
                              .map(|s| Loop::Continue(s))
                              .boxed()
                    }
                    Some(OwnedMessage::Ping(data)) => {
                        stream.send(OwnedMessage::Pong(data))
                              .map(|s| Loop::Continue(s))
                              .boxed()
                    }
                    Some(OwnedMessage::Close(_)) => {
                        stream.send(OwnedMessage::Close(None))
                              .map(|_| Loop::Break(()))
                              .boxed()
                    }
                    Some(OwnedMessage::Pong(_)) => {
                        future::ok(Loop::Continue(stream)).boxed()
                    }
                    None => future::ok(Loop::Break(())).boxed(),
                })
        })
    })

You'll notice that this is sending back Boxed messages, this is because sometimes a message gets sent back and sometimes it doesn't (it doesn't in the Pong match arm). If you don't want to always send a response I would recommend this method.

janhohenheim commented 7 years ago

Thank you very much for your response. My use case is that I want to independently send and receive data, i.e. I want to send data constantly and not only when receiving data.

Your code helped me a lot in coming up with this:

extern crate websocket;
extern crate futures;
extern crate tokio_core;

use websocket::message::OwnedMessage;
use websocket::server::InvalidConnection;
use websocket::client::async::Framed;
use websocket::async::{Server, MessageCodec};

use tokio_core::net::TcpStream;
use tokio_core::reactor::{Handle, Core};

use futures::{Future, Sink, Stream};
use futures::future::{self, Loop};

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let server = Server::bind("localhost:8081", &handle).unwrap();

    let f = server.incoming()
        .map_err(|InvalidConnection { error, .. }| error)
        .for_each(|(upgrade, addr)| {
            println!("Got a connection from: {}", addr);
            if !upgrade.protocols().iter().any(|s| s == "rust-websocket") {
                spawn_future(upgrade.reject(), "Upgrade Rejection", &handle);
                return Ok(());
            }

            let f = upgrade
                .use_protocol("rust-websocket")
                .accept()
                .and_then(|(framed, _)| { // Build error here
                    let (sink, stream) = framed.split();
                    let input = future::loop_fn(stream, |stream| {
                        stream
                            .into_future()
                            .and_then(|(msg, stream)|{
                                if let Some(OwnedMessage::Text(txt)) = msg {
                                    println!("received message: {}", txt)
                                }
                                Ok((msg, stream))
                            }).map(|(msg, stream)| {
                                match msg {
                                    Some(_) => Loop::Continue(stream),
                                    None => Loop::Break(()),
                                }
                            })
                    });
                    let output = future::loop_fn(sink, |sink| {
                        sink
                            .send({
                                OwnedMessage::Text("hi!".to_owned())
                            })
                            .map(|sink| {
                                match 1 {
                                    1 => Loop::Continue(sink),
                                    _ => Loop::Break(()) // This line is for type inference
                                }
                            })
                    });
                    input.select2(output)
                });
            spawn_future(f, "Client Status", &handle);
            Ok(())
        });

    core.run(f).unwrap();
}

fn spawn_future<F, I, E>(f: F, desc: &'static str, handle: &Handle)
    where F: Future<Item = I, Error = E> + 'static,
          E: Debug
{
    handle.spawn(f.map_err(move |e| println!("{}: '{:?}'", desc, e))
                     .map(move |_| println!("{}: Finished.", desc)));
}

The idea is that we have the futures input and output, which run independently until either one finishes.

The above code however fails to build with:

41 |                 .and_then(|(framed, _)| {
   |                  ^^^^^^^^ expected enum `futures::future::Either`, found enum `websocket::WebSocketError`

Do you have any idea how we could be able to fix this? I think once we got this sorted out we could use this as an example and close the issue.

janhohenheim commented 7 years ago

I managed to fix it. I created a PR to add it as an example: https://github.com/cyderize/rust-websocket/pull/131

janhohenheim commented 7 years ago

Implemented in #131