snapview / tungstenite-rs

Lightweight stream-based WebSocket implementation for Rust.
Apache License 2.0
1.87k stars 215 forks source link

How to implement ping timeouts with tungstenite::server? #14

Closed bbigras closed 7 years ago

bbigras commented 7 years ago

Do I need to use a Read + Write object with tungstenite::server::accept like in https://github.com/snapview/tungstenite-rs/issues/11#issuecomment-293187432? Anyone has an sample code for that?

I want to implement ping timeouts but still be able to use websocket.read_message() which blocks.

agalakhov commented 7 years ago

What is your reaction on a ping timeout? In WebSocket, a ping timeout is either a broken network connection or a serious violation of the WebSocket protocol by the peer. (RFC 6455 requires sending the pong ASAP after receiving a ping). Do you have any reason NOT to close the network connection where even ping doesn't work? As a simplest solution, I suggest just setting the corresponding TCP timeout on the connection (see the net2 crate) so that it will be closed automatically if there are no pings. This will unblock read_message() with corresponding error return value.

If, for any reason, you don't want to do so, then you really need a non-blocking stream instead of the standard TcpStream one. If so, use tokio-tungstenite instead. It has an example of the non-blocking server.

bbigras commented 7 years ago

What is your reaction on a ping timeout?

Close and restart the connection.

I suggest just setting the corresponding TCP timeout on the connection (see the net2 crate) so that it will be closed automatically if there are no pings. This will unblock read_message() with corresponding error return value.

Thanks. I'll try it.

If, for any reason, you don't want to do so, then you really need a non-blocking stream instead of the standard TcpStream one. If so, use tokio-tungstenite instead.

tokio-tungstenite doesn't support pings yet (see https://github.com/snapview/tokio-tungstenite/issues/6) and I can't afford to wait any longer.

agalakhov commented 7 years ago

Close and restart the connection.

Then setting the network timeout IS the correct way of doing things. The ping-pong behavior is just used to make sure the channel is alive, or else the timeout will be triggered. This is core TCP functionality, and WebSocket is not special. No data received => close, regardless of the nature of the data. If you're sending normal messages all the time, you don't need pings at all. Just the TCP connection will do. In fact, you only need ping if you have no other messages to send. WebSocket is not UDP, and pings are NOT for checking the connection - they are to keep the connection from being closed by the OS!

bbigras commented 7 years ago

WebSocket is not UDP, and pings are NOT for checking the connection - they are to keep the connection from being closed by the OS!

I use nginx and I think it will also close the connections after a while.

bbigras commented 7 years ago

I suggest just setting the corresponding TCP timeout on the connection (see the net2 crate) so that it will be closed automatically if there are no pings. This will unblock read_message() with corresponding error return value.

Would it also work with write_message()? The following example doesn't timeout after 2 secs.

extern crate tungstenite;
extern crate net2;

use tungstenite::server::accept;
use tungstenite::Message;

use net2::TcpStreamExt;

use std::net::TcpListener;
use std::{thread, time};

use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::Sender;

fn main() {
    let channels: Arc<RwLock<Vec<Mutex<Sender<usize>>>>> = Arc::new(RwLock::new(Vec::new()));
    let channels_clone1 = channels.clone();

    thread::spawn(move || loop {
                      for one in channels_clone1.read().unwrap().iter() {
                          one.lock().unwrap().send(1).unwrap();
                      }

                      thread::sleep(time::Duration::from_secs(5));
                  });

    let server = TcpListener::bind("127.0.0.1:3012").unwrap();

    for stream in server.incoming() {
        let channels_clone2 = channels.clone();
        thread::spawn(move || {
            let s = stream.unwrap();
            s.set_read_timeout_ms(Some(2000)).unwrap();
            s.set_write_timeout_ms(Some(2000)).unwrap();
            let mut websocket = accept(s).unwrap();

            let (tx, rx) = channel();
            channels_clone2.write().unwrap().push(Mutex::new(tx));

            loop {
                let r = rx.recv().unwrap();
                websocket
                    .write_message(Message::Text(r.to_string()))
                    .unwrap();
            }
        });
    }
}
agalakhov commented 7 years ago

The code is incorrect. You can't just write_message() and do nothing else. The write_message() call is almost always non-blocking, allowing for reading incoming messages even while sending. Always call read_message() somewhere, even if you just discard the message afterwards.

Probably I have to document it explicitly.

bbigras commented 7 years ago

In my case it's for a pubsub service which get the pub data from http so I think I never want to use read_message(). Maybe it's more related to https://github.com/snapview/tungstenite-rs/issues/11 than for the pings feature.

daniel-abramov commented 7 years ago

What do you try to achieve?

If you want to drop the connection if no data is received within the last N seconds, you can simply use non-blocking sockets from tokio together with tungstenite-rs and implement this logic yourself (with or without tokio-tungstenite, in case if you use tokio, there is already a tokio-timer crate for this, which combines well with other futures). I'm probably a bit confused, as I did not get the point how are those things related to "ping timeouts".

daniel-abramov commented 7 years ago

And just for the reference and clarification, https://github.com/snapview/tokio-tungstenite/issues/6 (which you mentioned here) is not about "ping timeouts", it was about extending the public interfaces of tokio-tungstenite (and some changes in tungstenite-rs) in order to make it possible to send ping messages from tokio (directly from the user code).

The reception of ping messages and proper reaction on them is already implemented inside tungstenite-rs thus supported in tokio-tungstenite internally.

bbigras commented 7 years ago

What do you try to achieve?

I want to send data from a channel to multiple ws clients for a pubsub service. I don't want to receive any data on the ws connections. I want the connections to be dropped quickly if a network cable is unplugged (with ws ping timeouts preferably to keep the connection alive with nginx).

I'm currently using ws-rs which allows me to read and write on a ws connection easily from multiple threads (so I can do ping timeouts easily) but I have a weird bug with it and I was hoping to use a better crate and keeping it simple (without using tokio).

And just for the reference and clarification, snapview/tokio-tungstenite#6 (which you mentioned here) is not about "ping timeouts", it was about extending the public interfaces of tokio-tungstenite (and some changes in tungstenite-rs) in order to make it possible to send ping messages from tokio (directly from the user code).

Yes.

The reception of ping messages and proper reaction on them is already implemented inside tungstenite-rs thus supported in tokio-tungstenite internally.

I want to send some too since I run both the clients and the server.

I did not get the point how are those things related to "ping timeouts"

I think it would be related if my code was valid (minus the set_read_timeout_ms and set_write_timeout_ms) and tungstenite-rs would handle the ping timeouts in the background.

I hope I was clear enough.

daniel-abramov commented 7 years ago

I want to send data from a channel to multiple ws clients for a pubsub service. I don't want to receive any data on the ws connections. I want the connections to be dropped quickly if a network cable is unplugged (with ws ping timeouts preferably to keep the connection alive with nginx).

Ok, I think I got your point. So your server sends the data to the clients and also it reads the data, but the only messages it receives are the ping messages and when the ping message has not been received within N milliseconds, you want to drop a connection. Is that the case? If so - you can implement something similar, but you do need to use non-blocking with tungstenite-rs.

I'm currently using ws-rs which allows me to read and write on a ws connection easily from multiple threads (so I can do ping timeouts easily) but I have a weird bug with it and I was hoping to use a better crate and keeping it simple (without using tokio).

If you don't want to do a transition to tokio, you can use mio, just use the TcpStream from mio with the tungstenite-rs and then you can implement the behavior you want. In fact ws-rs has mio under the hood, so it's based on non-blocking sockets as well.

I think it would be related if my code was valid (minus the set_read_timeout_ms and set_write_timeout_ms) and tungstenite-rs would handle the ping timeouts in the background.

What do you mean "in the background"? - tungstenite-rs does not create any background threads.

agalakhov commented 7 years ago

and tungstenite-rs would handle the ping timeouts in the background.

There is no "background" in tungstenite. Everything is done in the foreground. You are responsible for allowing tungstenite for processing incoming messages including pings. It is done by calling read_message(). If it isn't called, no pings will be processed, so you MUST call it.

You may switch your TcpStream into a non-blocking mode by calling set_nonblock(true) on it. If you do so, you must poll the stream's state manually. One possibility to do so is to use Poll from mio. But beware, the websocket protocol is bidirectional, so read_message() may also send something and vice versa.

daniel-abramov commented 7 years ago

I'm going to close this issue as it seems that the initial question has been answered.

Just to sum up:

  1. tungstenite-rs does not spawn any threads by itself.
  2. At the moment neither tungstenite-rs, nor tokio-tungstenite inform you that the Ping message has just been processed.
  3. If you need to implement some "message timeouts" and/or general read/write "timeouts", you have to use tungstenite with non-blocking sockets (good candidates are mio and tokio crates, which contains both the abstraction for TcpStream and the reactor / poll mechanism).