square / okhttp

Square’s meticulous HTTP client for the JVM, Android, and GraalVM.
https://square.github.io/okhttp/
Apache License 2.0
45.72k stars 9.15k forks source link

New API: WebSocket.flush() to await an empty queue #3317

Open jumperchen opened 7 years ago

jumperchen commented 7 years ago

If we send 1M messages in a loop, the websocket connection will close with the reason 1001 automatically. (it didn't happen on other Websocket clients, such as Browser or other Java Websocket client implementation, like Tyrus, NV-Websocket-Client)

Here is the gist to reproduce https://gist.github.com/jumperchen/6f8410b871a6d3746fe1967340d7a1a1

Version: 3.7.0 and 3.8.0-SNAPSHOT (master branch)

b95505017 commented 7 years ago

I think that works as intended. https://github.com/square/okhttp/blob/e172706b56a8616cf70f8d8285d5f8701c8a36a0/okhttp/src/main/java/okhttp3/internal/ws/RealWebSocket.java#L371

  private synchronized boolean send(ByteString data, int formatOpcode) {
    // Don't send new frames after we've failed or enqueued a close frame.
    if (failed || enqueuedClose) return false;

    // If this frame overflows the buffer, reject it and close the web socket.
    if (queueSize + data.size() > MAX_QUEUE_SIZE) {
      close(CLOSE_CLIENT_GOING_AWAY, null);
      return false;
    }
swankjesse commented 7 years ago

Yup. If you're deliberately sending a very large number of messages, you have to implement your own back pressure to slow enqueueing down to not exhaust memory. You can check the queue size and sleep in this case.

swankjesse commented 7 years ago

while (websocket.queueSize() > limit) {
  wait(500);
}
websocket.send(message);
}
jumperchen commented 7 years ago

Is it possible to make this MAX_QUEUE_SIZE configurable? Or make this class be extendable. Otherwise, we have no chance to change it. (we don't want to copy the whole implementation) :)

Thanks.

swankjesse commented 7 years ago

How much bigger do you want it?

Don’t think of the queue size as “the number of bytes you can send”, think of it as ”the amount of memory you’re holding” . . . we want a hard limit because there’s not much benefit to holding a very large queue of data in memory.

We need backpressure so that clients can slow down sends when the websocket isn’t keeping up. The loop above is my recommended option.

jumperchen commented 7 years ago

Our client may have 128GB memory or more :) So if it can be configurable that would be better. By the way, we used socket.io Java client, which is based on okhttp. (we don't use okhttp directly) Another thing I found that okhttp's overhead seem to be bigger than nv-websocket-client (may be related to this issue https://github.com/square/okhttp/issues/1733) From my tested, okhttp is 2~3 times slower than nv-websocket-client, so we may consider to switch the socket.io Java client implementation to boost the performance.

swankjesse commented 7 years ago

Can take a look at websocket compression. If you’d like to take a look and send a PR, that'll accelerate that!

Making the send buffer configurable is an option. I'm worried that it's the wrong way to fix the problem; if the websocket does fail it's unfortunate to have wasted energy enqueueing data that won't be sent.

What about an API like flush() that just sleeps until all currently-enqueued messages have been transmitted?

jumperchen commented 7 years ago

Yes, the flush() API sounds better than the current implementation that close the connection unexpectedly.

MaxwellDAssistek commented 1 year ago

I would like to bump this issue as I have also encountered a situation were it is necessary to ensure that the websocket has fully flushed its buffers.

swankjesse commented 1 year ago

Until we get it implemented, if you’d like to flush you can use this:

public void flush(WebSocket websocket) throws InterruptedException {
  long limit = 1024L * 1024 * 4; // Whatever limit you’d like
  while (websocket.queueSize() > limit) {
    wait(500);
  }
}