snapview / tokio-tungstenite

Future-based Tungstenite for Tokio. Lightweight stream-based WebSocket implementation
MIT License
1.82k stars 235 forks source link

websockets initial messages delayed multiple seconds #278

Closed rukai closed 1 year ago

rukai commented 1 year ago

Originally filed under https://github.com/tokio-rs/axum/issues/1961 but I've now reproduced it with just tokio-tungstenite

Please refer to this repo that reproduces the issues https://github.com/rukai/tokio-tungstenite/tree/bug_report Make sure to checkout the bug_report branch and then run: tokio-tungstenite> cargo run --example server and tokio-tungstenite> cargo run --example client

I create a websocket handler on the server which responds with many messages for each message that the client sends it. The server sends each message 1 second apart, but the client receives the first 3 messages all at once after 3 seconds. From that point things proceed reasonably Instead of this behavior I expect the initial messages to arrive immediately after being sent.

Heres an example run of the client, the first 3 messages are ~10 micoseconds apart instead of ~1 second apart:

>>> 1 25.988µs got 3 bytes: [1, 2, 3]
>>> 2 38.191µs got 3 bytes: [1, 2, 3]
>>> 3 42.449µs got 3 bytes: [1, 2, 3]
>>> 4 1.001139206s got 3 bytes: [1, 2, 3]
>>> 5 2.002299371s got 3 bytes: [1, 2, 3]
>>> 6 3.003446293s got 3 bytes: [1, 2, 3]
>>> 7 4.004633188s got 3 bytes: [1, 2, 3]
>>> 8 5.005915665s got 3 bytes: [1, 2, 3]
>>> 9 6.007027389s got 3 bytes: [1, 2, 3]
>>> 10 7.007849381s got 3 bytes: [1, 2, 3]
>>> 11 8.009091401s got 3 bytes: [1, 2, 3]
>>> 12 9.010271173s got 3 bytes: [1, 2, 3]
>>> 13 10.011424777s got 3 bytes: [1, 2, 3]
>>> 14 11.012627873s got 3 bytes: [1, 2, 3]
>>> 15 12.01381498s got 3 bytes: [1, 2, 3]
>>> 16 13.015029678s got 3 bytes: [1, 2, 3]
>>> 17 14.016179315s got 3 bytes: [1, 2, 3]
>>> 18 15.016951313s got 3 bytes: [1, 2, 3]
>>> 19 16.018198291s got 3 bytes: [1, 2, 3]
>>> 20 17.018960812s got 3 bytes: [1, 2, 3]

In my application I had these much more severe scenarios occur:

but I couldnt recreate these severe cases in a simple scenario. My application is running over a network which may be making the issue worse than the simple localhost test in the reproduced example.

I am running on linux and have not tested on other OS's

daniel-abramov commented 1 year ago

The behavior that you described is exactly the behavior that you've implemented in your code. You're sending 3 messages from the client, each 1 second apart, and you start reading the messages on the socket only after sending these 3 messages (i.e. after 3 seconds).

rukai commented 1 year ago

Oh you are absolutely right, and revisiting the problem this morning I've figured out the problem in my application. I assumed that a send would return immediately after the bytes are passed to the networking stack like in TCP. However it appears in websockets the send does not return until after a corresponding receive has occurred on the other side. This was causing my client reads to not be reached because the initial sequence of client sends were blocking them from being reached.

Thankyou for your help.

daniel-abramov commented 1 year ago

However it appears in websockets the send does not return until after a corresponding receive has occurred on the other side.

There is no such behavior for send() neither for WebSockets nor for the TCP. send() future resolves once the data is written to the socket.

rukai commented 1 year ago

There is no such behavior for send() neither for WebSockets nor for the TCP. send() future resolves once the data is written to the socket.

Right, I was close but still off.

I've reproduced the problem down to this case:

server:

    let (mut outgoing, mut incoming) = ws_stream.split();
    while let Some(Ok(message)) = incoming.next().await {
        tokio::time::sleep(std::time::Duration::from_millis(10000)).await;
    }

client

    sender.send(Message::Binary(vec![0; 1])).await.unwrap();
    sender.send(Message::Binary(vec![0; 16777900])).await.unwrap(); // this blocks for 10s
  1. client sends a 1 byte message to the server.
  2. server begins processing 1 byte message. (by sleeping for 10s)
  3. client sends a large (~1MB) message to the server.
  4. server is still processing 1 byte message.
  5. client blocks on the send for the large message until the server completes processing the 1 byte message. (10s)

My understanding for the cause of step 5 is that the TCP connection is exerting backpressure. The send in step 3 sends some packets but eventually the TCP receive window ends up at 0 and the send comes to a halt. This is because the server can not begin processing any of those packets until the server finishes processing the first message and can proceed to the next next call.

My understanding is that this is not a bug in tungstenite but just intended behaviour, does that sound right to you?

daniel-abramov commented 1 year ago

Ah, now I get what you mean.

Yes, that's almost as you described. See, TCP is a reliable transmission protocol, so the data is sent in segments, each of which must be acknowledged. But imagine that after establishing a socket connection a user would place 1 GB of data into a socket without the other part reading it. Where will the data end up? - It either had to be buffered or dropped. Dropping would go against the design of the TCP, so it's getting "buffered" (I'm simplifying a bit here), but the buffer size is not unlimited and in fact rather small (sensible). Hence, if you've written a decent amount of data to the socket without acknowledging and trying to write more, you'll get an EWOULDBLOCK error on the socket.

The semantics of the send() function (the extension of Sink in Rust) is that it not only enqueues the message but also flushes the buffer, hence the behavior that you're observing.

In other words: the described behavior is just a regular behavior of TCP sockets. You could solve it by introducing a queue of messages so that sending does not flush the data, but just adds it to the queue and spawn a separate task that flushes the queue. That way you don't block the sending part. It can be as simple as creating an mpsc channel that accepts tungstenite::Message and let the "sender" task write data to the channel (instead of writing it directly to the socket) and then spawn another Tokio task that simply does something like rx.forward(sink), where sink is tokio-tungstenite's sink. There are also examples in our repository that demonstrate the usage of tx/rx mpsc channel with tokio-tungstenite.