housleyjk / ws-rs

Lightweight, event-driven WebSockets for Rust.
MIT License
1.46k stars 221 forks source link

Server blocks itself after resending multiple messages #346

Open ARCJ137442 opened 5 months ago

ARCJ137442 commented 5 months ago

A MWE of it:

use std::thread::{self, sleep};
extern crate ws;

fn main() {
    // A client that sends tons of messages to the server
    thread::spawn(move || {
        let _ = ws::connect("ws://127.0.0.1:3012", |sender| {
            let mut num_send = 0_usize;
            // Generate a thread that constantly sends messages for testing
            thread::spawn(move || loop {
                num_send += 1;
                // The content is just for example, the actual situation has more variety
                let _ = sender.send(format!("overwhelming message #{num_send}!"));
            });

            // Handle nothing
            move |_| Ok(())
        });
    });

    // A server that echoes messages back to the client
    ws::listen("127.0.0.1:3012", |sender| {
        // Handle received message
        move |msg| {
            println!("Got message: {}", msg);
            // ! It will block on ↓this line when the `SyncSender` is full
            let _ = sender.send(msg);
            // * ↑If uncomment this line of code, the server will not be blocked
            Ok(())
        }
    })
    .unwrap();
}

The output seems to show that the server always blocks after resending 500 messages:

Got message: overwhelming message #1!
Got message: overwhelming message #2!
Got message: overwhelming message #3!
Got message: overwhelming message #4!
Got message: overwhelming message #5!
Got message: overwhelming message #6!
Got message: overwhelming message #7!
Got message: overwhelming message #8!
Got message: overwhelming message #9!
Got message: overwhelming message #10!
Got message: overwhelming message #11!
Got message: overwhelming message #12!
Got message: overwhelming message #13!
Got message: overwhelming message #14!
Got message: overwhelming message #15!
Got message: overwhelming message #16!
Got message: overwhelming message #17!
Got message: overwhelming message #18!
Got message: overwhelming message #19!
Got message: overwhelming message #20!
[... snipped from `21` ~ `490`]
Got message: overwhelming message #490!
Got message: overwhelming message #491!
Got message: overwhelming message #492!
Got message: overwhelming message #493!
Got message: overwhelming message #494!
Got message: overwhelming message #495!
Got message: overwhelming message #496!
Got message: overwhelming message #497!
Got message: overwhelming message #498!
Got message: overwhelming message #499!
Got message: overwhelming message #500!
Got message: overwhelming message #501!

Once the terminal stops the output, the whole receiving process will be completely blocked, and there is almost no possibility of recovery. It's confusing why do the library use the blocking SyncSender instead of Sender when interfacing with mio inside it? Or, is any solution of the limited ws::Sender such as WebSocket::set_non_blocking for sending messages?

ARCJ137442 commented 5 months ago

After reading ws::Setting, I have improved the MWE to this one:

use std::thread::{self, sleep};
extern crate ws;

fn main() {
    // A client that sends tons of messages to the server
    thread::spawn(move || {
        let _ = ws::connect("ws://127.0.0.1:3012", |sender| {
            let mut num_send = 0_usize;
            // Generate a thread that constantly sends messages for testing
            thread::spawn(move || loop {
                num_send += 1;
                // The content is just for example, the actual situation has more variety
                let _ = sender.send(format!("overwhelming message #{num_send}!"));
            });

            // Handle nothing
            move |_| Ok(())
        });
    });

    // A server that echoes messages back to the client
    ws::Builder::new()
        .with_settings(ws::Settings {
            max_connections: 0x40,
            // * ↓Changing this setting to `usize::MAX` actually can't be allowed: It might run out of memory
            queue_size: 0x300,
            // ! ↓Even if it's enabled, it still can't stop the blocking
            panic_on_queue: true,
            ..Default::default()
        })
        .build(|sender: ws::Sender| {
            // handle received message
            move |msg| {
                println!("Got message: {}", msg);
                println!("from {sender:?}");
                // ! It will block on ↓this line when the `SyncSender` is full
                let _ = sender.send(msg);
                // * ↑If uncomment this line of code, the server will not be blocked
                Ok(())
            }
        })
        .unwrap()
        .listen("127.0.0.1:3012")
        .unwrap();
}

But the confusing problem remains: Why not choose mio::channel::Sender as the inner sender of ws::Sender, or provide some options to run a server that will never blocks on sending messages?