snapview / tokio-tungstenite

Future-based Tungstenite for Tokio. Lightweight stream-based WebSocket implementation
MIT License
1.88k stars 236 forks source link

`TcpListener::accept` blocks runtime after accepting a websocket connection #304

Closed SmnTin closed 1 year ago

SmnTin commented 1 year ago

main.rs:

use std::{net::SocketAddr, thread, time::Duration};

use futures::SinkExt;
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::WebSocketStream;
use tungstenite::Message;

#[tokio::main]
async fn main() {
    let addr = launch_server();
    tokio::time::sleep(Duration::from_millis(100)).await;
    launch_client(addr);
    tokio::time::sleep(Duration::from_millis(500)).await;
    println!("It is time to die");
}

fn launch_server() -> SocketAddr {
    let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
    let addr = listener.local_addr().unwrap();

    tokio::spawn(async move {
        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
        run_server(listener).await;
    });

    addr
}

fn launch_client(addr: SocketAddr) {
    thread::spawn(move || {
        println!("Client started");
        let url = format!("ws://{}/", addr);
        let (mut socket, _) = tungstenite::connect(&url).unwrap();

        while let Ok(msg) = socket.read() {
            println!("Received: {:?}", msg);
        }
    });
}

async fn run_server(listener: TcpListener) {
    println!("Server started");
    loop {
        println!("Waiting for a client");
        let (stream, _addr) = listener.accept().await.unwrap();
        println!("Client connected");
        let stream = tokio_tungstenite::accept_async(stream).await.unwrap();
        println!("Client accepted");
        tokio::spawn(process_client(stream));
    }
}

async fn process_client(mut stream: WebSocketStream<TcpStream>) {
    println!("Client is being processed");
    loop {
        stream.send(Message::Text("boba".to_owned())).await.unwrap();
        tokio::time::sleep(Duration::from_millis(50)).await;
    }
}

Cargo.toml:

[package]
name = "tokio-bug"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.33", features = ["full"] }
tokio-tungstenite = "*"
tungstenite = "*"
futures = "0.3"

rustc version: rustc 1.75.0-nightly (1c05d50c8 2023-10-21) OS: macOS Ventura 13.4.1 (c)

Expected behavior

The client in a separate thread connects to the server and receives a message every 50 ms. After 500 ms, the server and the client shut down.

The log looks like:

Server started
Waiting for a client
Client started
Client connected
Client accepted
Waiting for a client
Client is being processed
Received "boba"
...
Received "boba"
It is time to die

Actual behavior

However, the real execution prints:

Server started
Waiting for a client
Client started
Client connected
Client accepted
Waiting for a client

The following happens:

Instrumenting with tokio-console confirms that the executor is blocked.

SmnTin commented 1 year ago

It turned out that TcpListener::from_std requires that the listener is already in non-blocking mode. It was hard to find since without tungstenite, everything worked just fine.

Setting the std listener to a non-blocking mode before wrapping it into the tokio listener resolves the issue.

agalakhov commented 1 year ago

Yes, we do not change mode of the listener since we can't make any assumptions about its interface except that it can send and receive.