websockets-rs / rust-websocket

A WebSocket (RFC6455) library written in Rust
http://websockets-rs.github.io/rust-websocket/
MIT License
1.55k stars 223 forks source link

Half-blocking method. Is it supported/possible? #271

Open MageSlayer opened 1 year ago

MageSlayer commented 1 year ago

Hi

I am trying to implement "half-blocking" mode. That is blocking write & non-blocking read. Currently I use following code together with tungstenite. Blocking write is done using "write_inner". TcpStream is from "mio" crate.

Unfortunately, it does not work reliably. I am getting various errors at handshake & later time.

I'd like to ask. If rust-websocket could be used to implement working scheme like that?

struct TcpCustomStream {
    s: TcpStream,

    block: bool,
    poll: Poll,
    events: Events,
}

impl TcpCustomStream {
    pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
        let a = if let Some(x) = addr.to_socket_addrs()?.next() {
            x
        } else {
            return Err(std::io::Error::new(
                std::io::ErrorKind::Other,
                "Tcp connect call failed",
            ));
        };
        match TcpStream::connect(a) {
            Ok(mut x) => {
                let poll = Poll::new()?;
                let events = Events::with_capacity(128);

                // Register the socket with `Poll`
                poll.registry()
                    .register(&mut x, Token(0), Interest::WRITABLE)?;

                Ok(Self {
                    s: x,
                    poll,
                    events,
                    block: true,
                })
            }
            Err(x) => Err(x),
        }
    }

    pub fn set_nonblocking(&mut self) -> io::Result<()> {
        self.block = false;
        Ok(())
    }

    pub fn wait_write(&mut self) -> io::Result<()> {
        loop {
            //println!("tcp wait_write poll");
            self.poll.poll(&mut self.events, None)?;

            for event in &self.events {
                if event.token() == Token(0) && event.is_writable() {
                    // The socket connected (probably, it could still be a spurious
                    // wakeup)
                    //println!("tcp wait_write ready");
                    return Ok(());
                }
            }
        }
    }

}

impl std::io::Read for TcpCustomStream {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        loop {
            match self.s.read(buf) {
                Err(x) if self.block && would_block(&x) => {
                    //println!("tcp read would_block {:?}", x);
                    thread::yield_now();
                }
                // x @ Err(_) => {
                //     //println!("tcp read err {:?}", x);
                //     break x;
                // }
                x => break x,
            }
        }
    }
}

impl std::io::Write for TcpCustomStream {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        match self.s.write(buf) {
            // x @ Err(_) => {
            //     println!("tcp write err {:?}", x);
            //     x
            // }
            x => x,
        }
    }
    fn flush(&mut self) -> std::io::Result<()> {
        self.s.flush()
    }
}

... skipped
pub struct Client {
    socket: Option<WebSocket<TcpCustomStream>>,
}

fn would_block(err: &std::io::Error) -> bool {
    match err {
        x if (x.kind() == io::ErrorKind::WouldBlock)
            || (x.kind() == std::io::ErrorKind::Interrupted) =>
        {
            true
        }
        _ => false,
    }
}

impl Client {
    pub fn new() -> Self {
        Self {
            socket: None,
        }
    }

    pub fn connect<'a>(&mut self, uri_str: &'a str) -> Result<(), std::io::Error> {
        println!("Client started");

        let (mut client, _) = {
            let mut c = 10;
            loop {
                let req = Uri::from_maybe_shared(uri_str.to_string()).map_err(|_| {
                    std::io::Error::new(std::io::ErrorKind::InvalidInput, "Invalid connect URI")
                })?;

                let host_port = req
                    .authority()
                    .ok_or(std::io::Error::new(
                        std::io::ErrorKind::InvalidInput,
                        "Invalid connect URI",
                    ))?
                    .as_str();

                let stream = TcpCustomStream::connect(host_port)?;
                println!("Client connected");

                match client(req, stream) {
                    Err(x) => {
                        println!("Connect {:?}", x);
                        if c == 1 {
                            return Err(std::io::Error::new(
                                std::io::ErrorKind::Other,
                                format!("Cannot connect to {}", uri_str),
                            ));
                        }
                        c -= 1;
                        thread::sleep(std::time::Duration::from_millis(1000));
                    }
                    Ok(x) => break x,
                }
            }
        };
        println!("WSClient connected");

        client.get_mut().set_nonblocking()?;
        println!("Set nonblocking mode");

        self.socket = Some(client);

        Ok(())
    }

    fn write_inner(socket: &mut WebSocket<TcpCustomStream>, m: Message) -> Result<(), Error> {
        match socket.write_message(m) {
            Err(tungstenite::Error::Io(e)) if would_block(&e) => {
                //println!("cl write would_block");
                return loop {
                    socket.get_mut().wait_write()?;
                    match socket.write_pending() {
                        Err(tungstenite::Error::Io(e)) if would_block(&e) => continue,
                        x => break x,
                    }
                };
            }
            x => {
                //println!("cl write {:?}", x);
                x
            }
        }
    }
vi commented 1 year ago

It is not recommended to use rust-websocket (i.e. websocket crate) for new projects, you should stick with tungstenite.

If by "half-blocking" you mean writing messages to Websocket from sync world and reading from it from async (e.g. Tokio) world (or the same with reading and writing swapped), then you should use tokio-tungstenite in async mode and make a channel (e.g. tokio::sync::mpsc or flume) to bridge sync and async worlds.

You should specify you use case (how such "half-blocking" Websockets intergrate into a larger scheme) for a more specific advice.

MageSlayer commented 1 year ago

You should specify you use case (how such "half-blocking" Websockets intergrate into a larger scheme) for a more specific advice.

My scheme is following:

  1. I have Rust server capable of accepting Websocket connections. It works & behaves ok. It's done using tungstenite.
  2. Client. I have third-party proprietary closed source system, which load dynlibs & call functions via C-like ffi interface. These calls are done in single thread & I cannot block there. Also I cannot spawn any additional threads there. So there is no any runtime (Rust or any other).

So the only approach is non-blocking single-thread C-like manner. That what I call half-blocking mode. So it's:

My point is that reading is problematic here. It requires some sort of "cursor" letting know other parts of system where it left off last time if message wasn't read entirely (buffering). Unfortunately I cannot see where this "cursor" might be implemented reliably in my scheme.

Perhaps same issue in tungstenite might be useful here https://github.com/snapview/tungstenite-rs/issues/308

vi commented 1 year ago

eager reading from socket if anything is there or flagging if no message received

What thread will wait for socket events?

flagging if no message received

What happens after such flagging? How will the application know when to attempt reading from the websocket next time?


blocking on socket write as it's expected to be rather quick

Note that if you Rust server (or network between the client and the server) is slow then that socket write gets length - backpressure.

MageSlayer commented 1 year ago

What thread will wait for socket events?

It's the same single thread only available.

What happens after such flagging? How will the application know when to attempt reading from the websocket next time?

It's done by timer. So it's poor man's polling.

blocking on socket write as it's expected to be rather quick

Note that if you Rust server (or network between the client and the server) is slow then that socket write gets length - backpressure.

Yes. That's unfortunate, but I cannot see any other viable alternative.

vi commented 1 year ago

It's done by timer. So it's poor man's polling.

OK, so we are in the hacks land.

In this case I would still use async/nonblocking everywhere (including for sending) by rolling customized low-level async utils (socket wrapper, timers) and maybe executor (though async-executor's try_tick() seems to do the trick on itself. Maybe futures_executor::LocalPool is even better match.).

This way the only tradeoffs I expect are:

Other than that, from outside the plugin should look as if socket were used directly, without any additional file descriptors, threads and so on.


Here is my demo using async-tungstenite: https://gist.github.com/vi/28117c2583ea74d35babfcd6abbef9e6

It should handle backpressure properly.

Maybe there are ready-made crates for such use case, but sometimes it is simpler to write than to find.

MageSlayer commented 1 year ago

That's a real gift (not gist) :) I'll try that asap!

MageSlayer commented 1 year ago

Hi again. Sorry for late response, but I'd like to ask some more stupid questions :)

I'm not that experienced in async Rust, so it looks like "subtask1" & "subtask2" are doing async send/receive. Thus a question - how can I "drive" them synchronously?

I mean - should I build an intermediate queue between sync & async code to pass values when sending/receiving from host application? Making use of something like following does not seem to be right as the "main loop" is hidden behind self.exe.try_tick().

let send_fut = c_tx.send(Message::Text(format!("Hello, {}", 1))); //.await;
block_on(send_fut)?;
vi commented 1 year ago

I'm not that experienced in async Rust, so it looks like "subtask1" & "subtask2" are doing async send/receive. Thus a question - how can I "drive" them synchronously?

What do you mean "to drive synchronously"?

If you want to interact with sync code, you'll probably need a channel like flume. This channel can be sync from one side and async from the other side. Maybe it would just work as is.


For driving subtasks simultaneously futures::future::select is only one of the ways. Just adding more tasks to the executor (i.e. multiple exe.spawns) would probably be better.


Here is my second demo that shows some of the ideas above applied:

https://gist.github.com/vi/39607d1963b069a5167099f3fbffebf4

MageSlayer commented 1 year ago

If you want to interact with sync code, you'll probably need a channel like flume. This channel can be sync from one side and async from the other side. Maybe it would just work as is.

Yes. I'd like send/receive values to/from async part into/from sync functions.

Here is my second demo that shows some of the ideas above applied:

Thanks a lot for details. So, I guess the right way to emulate "non-blocking receive" is to read channel after doing following. Right?

self.wakers.wake_all();
self.exe.run_until_stalled();
vi commented 1 year ago

Thanks a lot for details. So, I guess the right way to emulate "non-blocking receive" is to read channel after doing following. Right?

Yes, using flume::Receiver::try_recv.

Note that if you want to do more tricky things (timeouts, retries, reconnects, simultaneous things) while staying single-threaded&nonblocking then you may prefer doing them within async world and only deliver final result to sync when needed.

MageSlayer commented 1 year ago

Note that if you want to do more tricky things (timeouts, retries, reconnects, simultaneous things) while staying single-threaded&nonblocking then you may prefer doing them within async world and only deliver final result to sync when needed.

Yes. I guess it looks possible now with your help. Thanks a lot for your help and especially examples.