Closed sunshowers closed 1 year ago
Firstly you can set an upper bound with WebSocketConfig::max_write_buffer_size
. When calling write
and a frame would cause the write-buffer to be larger than this you'll get a Err. By default yes it is unbound.
However, it's a misconception that the write-buffer will necessarily fill up at all. Even if you only call feed
and never actually flush
we still wouldn't generally expect it to fill up. That's because tungstenite writes it to the underlying stream. Once it is successfully written there it is removed from the tungstenite write-buffer.
In fact before https://github.com/snapview/tungstenite-rs/pull/358 each write
/feed
would always write to the underlying stream eagerly on each call. So the write-buffer can only build up when there are write io failures. That is important and the primary use of the write-buffer, to ensure partial writes can be resumed after errors on the next calls to write
/flush
.
After https://github.com/snapview/tungstenite-rs/pull/358 tungstenite does actually do use the write-buffer a bit more as an optimisation and writes to the underlying stream every ~128KiB. This makes things a bit faster (depending on the underlying stream) but doesn't really change the core behaviour. This value is also configurable with WebSocketConfig::write_buffer_size
(setting to zero is the old behaviour).
As you would expect you should call flush
to guarantee that whatever you have passed to write
/feed
is actually flushed. Sometimes (often? typically?) the underlying stream will itself implicitly flush after some amount of writes. In this case no explicit calls to flush
will actually mean writing and flushing is done quite efficiently (At least this is the behaviour I observed when testing it, I should probably confirm by looking at the underlying impls).
So it is possible that if you haven't called flush
some of your messages will not have been flushed. But unlikely that the write-buffer is actually filling up unless you are observing lots of write errors.
The write buffer is actually intended to store messages in case they can't be sent right now. Reasons not to send include network congestion or previous messages still being sent. A correct code is expected to flush if there are real-time considerations.
If you're writing some real-time communication, keep in mind that WebSocket is TCP-based. TCP has its own internal buffering algorithms (such as Nagle), some of them may be even running on intermediate network routers far beyond your code's reach.
However, it's a misconception that the write-buffer will necessarily fill up at all. Even if you only call feed and never actually flush we still wouldn't generally expect it to fill up. That's because tungstenite writes it to the underlying stream. Once it is successfully written there it is removed from the tungstenite write-buffer.
Is this true even in unusual / bad situations, such as an unresponsive server? I tried to mock up a small reproducer and landed on this, which is admittedly hacky in multiple ways:
use anyhow::Context;
use anyhow::Result;
use futures::SinkExt;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::net::TcpSocket;
use tokio::select;
use tungstenite::protocol::Message;
use tungstenite::protocol::WebSocketConfig;
async fn server(port: u16) -> Result<()> {
let accepter = TcpListener::bind(format!("127.0.0.1:{port}")).await?;
loop {
let (stream, peer) = accepter.accept().await?;
println!("server: accepted connection from {peer}");
let _ws = tokio_tungstenite::accept_async(stream)
.await
.context("accept_async failed")?;
println!("server: accepted websocket connection");
println!("server: going to sleep with connection open...");
loop {
tokio::time::sleep(Duration::from_secs(1024)).await;
}
}
}
async fn client(port: u16) -> Result<()> {
let config = WebSocketConfig {
max_send_queue: None,
max_message_size: Some(64 << 20),
max_frame_size: Some(16 << 20),
accept_unmasked_frames: false,
//max_write_buffer_size: usize::MAX,
//write_buffer_size: 0,
};
let addr = format!("127.0.0.1:{port}");
let sock = TcpSocket::new_v4()?;
let stream = sock.connect(addr.parse().unwrap()).await?;
let (mut ws, _resp) = tokio_tungstenite::client_async_with_config(
format!("ws://{addr}/"),
stream,
Some(config),
)
.await
.context("client_async_with_config failed")?;
let mut sent_count = 0;
loop {
select! {
result = ws.feed(Message::Text("Hello".to_string())) => {
match result {
Ok(()) => {
println!("client: sent Hello {sent_count} times");
sent_count += 1;
}
Err(err) => {
println!("client feed failed: {err}");
return Err(err.into());
}
}
}
_ = tokio::time::sleep(Duration::from_millis(500)) => {
println!("client: timeout waiting to send");
}
}
}
}
#[tokio::main]
async fn main() {
let port = 7777;
let t1 = tokio::spawn(server(port));
// give server time to bind, gross hack
tokio::time::sleep(Duration::from_millis(500)).await;
let t2 = tokio::spawn(client(port));
_ = futures::join!(t1, t2);
}
The server accepts a connection but then never reads, to emulate it hanging. The client tries to send constantly. Using the published 0.19 versions, if I run this on my machine I see the client eventually start timing out (meaning the future returned by feed()
is not Ready
):
server: accepted connection from 127.0.0.1:37780
server: accepted websocket connection
server: going to sleep with connection open...
client: sent Hello 0 times
client: sent Hello 1 times
client: sent Hello 2 times
client: sent Hello 3 times
... snip ...
client: sent Hello 238309 times
client: sent Hello 238310 times
client: sent Hello 238311 times
client: sent Hello 238312 times
client: sent Hello 238313 times
client: timeout waiting to send
client: timeout waiting to send
client: timeout waiting to send
client: timeout waiting to send
client: timeout waiting to send
If I switch to the current main branches and uncomment the two new fields in WebSocketConfig
, I never see client: timeout waiting to send
; it appears to continue to buffer indefinitely. If I set max_write_buffer_size
to a smaller value than usize::MAX
, the client sends until it hits that size, and then the future returned by feed()
fails:
...
client: sent Hello 95449 times
client: sent Hello 95450 times
client: sent Hello 95451 times
client feed failed: Write buffer is full
Neither of these matches the behavior of the released versions, which is the behavior I think I would expect? (I.e., if we're still connected but not able to continue to send to the server, feed()
is not Ready
and so we start to see timeouts.)
Thanks for the working example @jgallagher. What's happening here is, after a little while, tungstenite write
is returning WouldBlock
errors.
So the write-buffer can only build up when there are write io failures.
So this is correct from a tungstenite user's point of view. tokio-tungstenite has one important difference in that it ignores WouldBlock
errors. So the write-buffer can indeed fill up without error if the underlying stream is returning this.
Note: Current version 0.19
doesn't behave this way because it calls flush
on every feed
, which is incorrect/suboptimal.
So the question is how do we want tokio-tungstenite to behave?
My first thought is having poll_ready
return Pending
after feed has hit WouldBlock
errors.
My first thought is having poll_ready return Pending after feed has hit WouldBlock errors.
Perhaps https://github.com/snapview/tokio-tungstenite/pull/287 addresses this then?
This makes it the statement "write-buffer can only build up when there are write io failures" true for tokio-tungstenite users too.
So this is correct from a tungstenite user's point of view. tokio-tungstenite has one important difference in that it ignores WouldBlock errors. So the write-buffer can indeed fill up without error if the underlying stream is returning this.
Note: Current version 0.19 doesn't behave this way because it calls flush on every feed, which is incorrect/suboptimal.
Since WebSocketStream
implements Sink
, we must ensure that we uphold the "contract" of the trait. The main idea behind poll_ready()
(according to the documentation) is that it prepares the sink to accept new data so that the consequent start_send()
can add the item to the sink successfully.
So if we judge the code strictly, then the change that changed the behavior of the poll_ready()
to always return Ready
unconditionally was incorrect as it's the same as asserting that the sink is always ready to accept new data, regardless of its internal state. Moreover, the error returned from the start_send()
oftentimes means that the sink will not accept any new items (as per documentation), so we probably really don't want to return errors from it, unless it's a real error that terminates the sink.
I think the correct way to ensure that the poll_ready()
prepares the sink to accept new items is to call flush()
the same way we did it in the previous versions. Yes, it's surely inefficient in the scenarios that you described in the previous PRs, but there was no simple and elegant way to do it with the former versions of the tungstenite
and when we needed to choose between "fast" and "correct" behavior, we've always leaned towards the correct and predictable behavior (after all, if the performance is of paramount importance, there are crates that are specifically optimized for it).
IMHO if we really want to achieve "no flushes" behavior with the current API surface of the crates, the proper way of doing so would mean introducing the corresponding methods directly onto the WebSocketStream
allowing the user to be in charge of the actual behavior. That way we respect the contract of the Sink
and Stream
traits (so the tokio-tungstenite
remains interoperable and easy to use) while allowing the user to call the new methods explicitly (enqueue()
, flush()
or whatever we name them). Luckily, with the new API of the tungstenite
(after your changes) allows us to implement such methods. I think explicit behavior is a safer bet as it does not require the user to make assumptions about the internals of our sink implementation.
I replied in the pr https://github.com/snapview/tokio-tungstenite/pull/287#issuecomment-1595811343
IMHO if we really want to achieve "no flushes" behavior
Note the PR isn't "no flushes", it adds flushing to poll_ready
but only after WouldBlock
errors.
Hi there --
We've been looking at some of our uses of tokio-tungstenite and as part of that we came across #284. It looks like #284 makes it so that
poll_ready
always returnsPoll::Ready
.However, consider what happens if we call
SinkExt::feed
repeatedly while never flushing (or if we try flushing in aselect!
loop but it always gets cancelled before it's done) -- it would seem that we keep adding messages to the tungstenite buffer and never flushing anything.This seems to mean that there is no upper bound to the size of the buffer, i.e. that if we never call flush then the buffer can grow in an unbounded fashion.
poll_ready
, maybe flush if the buffer is larger than some size?cc @alexheretic, @agalakhov