websockets / ws

Simple to use, blazing fast and thoroughly tested WebSocket client and server for Node.js
MIT License
21.61k stars 2.42k forks source link

perf: cork socket for a micro task #2214

Closed ronag closed 6 months ago

ronag commented 6 months ago

Significantly improves performance when writing a lot of messages.

lpinca commented 6 months ago

Can you show some numbers? I would prefer not to make it the default. See also https://github.com/websockets/ws/pull/1999 and https://github.com/websockets/ws/pull/1797.

ronag commented 6 months ago

Given your response on 1797 I don't think you will agree with me.

I can monkey patch this so it's fine.

lpinca commented 6 months ago

This also introduces a behavior difference (probably irrelevant but needs more thoughts) if websocket.terminate() is called in the same tick after websocket.send(). The callback of socket.write() is not called because the socket is destroyed while it is corked.

ronag commented 6 months ago

if websocket.terminate() is called in the same tick after websocket.send(). The callback of socket.write() is not called because the socket is destroyed while it is corked.

The callback should be called with an error? I do don't think it should be a problem. Don't you have tests for that?

lpinca commented 6 months ago

I think so but it should be addressed in Node.js core.

const net = require('net');

const server = net.createServer();

server.on('connection', function (socket) {
  socket.resume();
  socket.cork();

  socket.write('chunk1', function (err) {
    console.error(err);
  });
  socket.write('chunk2', function (err) {
    console.error(err);
  });

  process.nextTick(function () {
    console.log('uncorked');
    socket.uncork();
  });

  socket.destroy();
});

server.listen(0, function () {
  const socket = net.createConnection({
    port: server.address().port
  });

  socket.on('data', function (chunk) {
    console.log(chunk.toString());
  });
});
lpinca commented 6 months ago

Here is benchmark run with an echo server.

import { WebSocketServer } from 'ws';

const server = new WebSocketServer(
  {
    allowSynchronousEvents: true,
    port: 8080
  },
  function () {
    console.log('Server listening on *:8080');
  }
);

server.on('connection', function (ws) {
  ws.on('message', function (data, isBinary) {
    ws.send(data, { binary: isBinary });
  });
});

Without this patch

$ ./load_test 32 127.0.0.1 8080 0 0 125
Using message size of 125 bytes
Running benchmark now...
Msg/sec: 81414.000000
Msg/sec: 83492.500000
Msg/sec: 82391.500000
Msg/sec: 81919.000000
Msg/sec: 84937.500000
Msg/sec: 86152.250000
Msg/sec: 86007.750000
Msg/sec: 85930.250000

With this patch

$ ./load_test 32 127.0.0.1 8080 0 0 125
Using message size of 125 bytes
Running benchmark now...
Msg/sec: 83286.250000
Msg/sec: 85554.500000
Msg/sec: 85760.250000
Msg/sec: 85684.750000
Msg/sec: 84287.000000
Msg/sec: 83487.750000
Msg/sec: 84072.750000
Msg/sec: 84051.000000

load_test is taken from https://github.com/uNetworking/uWebSockets/blob/master/benchmarks/load_test.c

It confirms my thoughts on https://github.com/websockets/ws/pull/1797. I also prefer websocket.send() to closely mimic socket.write() without corking by default.

ronag commented 6 months ago

That benchmark totally misses the point. This change has been a huge performance boost for us in production.

ronag commented 6 months ago

Don't worry about this PR. I thought it was a no brainer and don't have the bandwidth to convince you if you think otherwise.

ronag commented 6 months ago

I'll have a look at the node core thing. As far as I know if should work if you destroy the socket with an error during terminate.

lpinca commented 6 months ago

As far as I know if should work if you destroy the socket with an error during terminate.

An 'error' event is emitted on the socket, but the callbacks for corked writes are still not called. It is reproducible by calling socket.destroy(new Error('err')) here https://github.com/websockets/ws/pull/2214#issuecomment-2029338838.

How much is performance improved in your case and why is websocket.send() called several times in the same tick?

ronag commented 6 months ago

How much is performance improved in your case and why is websocket.send() called several times in the same tick?

I don't have hard numbers. It went from being the primary bottleneck in our profiling to almost negligible. I can't run your benchmark since I don't have load_test.

ronag commented 6 months ago

An 'error' event is emitted on the socket, but the callbacks for corked writes are still not called. It is reproducible by calling socket.destroy(new Error('err')) here https://github.com/websockets/ws/pull/2214#issuecomment-2029338838.

I will have a look.

lpinca commented 6 months ago

I can't run your benchmark since I don't have load_test

git clone https://github.com/uNetworking/uWebSockets.git
cd uWebSockets
git submodule init
git submodule update uSockets 
cd benchmarks
make

If you are on macOS and the build fails, this patch works

diff --git a/benchmarks/Makefile b/benchmarks/Makefile
index 9275d91..f498072 100644
--- a/benchmarks/Makefile
+++ b/benchmarks/Makefile
@@ -1,5 +1,5 @@
 default:
-   g++ -flto -march=native parser.cpp -O3 -I../uSockets/src -o parser
+#  g++ -flto -march=native parser.cpp -O3 -I../uSockets/src -o parser
    clang -flto -O3 -DLIBUS_USE_OPENSSL -I../uSockets/src ../uSockets/src/*.c ../uSockets/src/eventing/*.c ../uSockets/src/crypto/*.c broadcast_test.c load_test.c scale_test.c -c
    clang++ -flto -O3 -DLIBUS_USE_OPENSSL -I../uSockets/src ../uSockets/src/crypto/*.cpp -c -std=c++17
    clang++ -flto -O3 -DLIBUS_USE_OPENSSL `ls *.o | grep -Ev "load_test|scale_test"` -lssl -lcrypto -o broadcast_test
ronag commented 6 months ago

Yea, it's missing openssl/ssl.h and I don't know how to figure that out. brew install openssl is insufficient.

Just change your benchmark to do multiple sends in same tick:

import { WebSocketServer } from 'ws';

const server = new WebSocketServer(
  {
    allowSynchronousEvents: true,
    port: 8080
  },
  function () {
    console.log('Server listening on *:8080');
  }
);

server.on('connection', function (ws) {
  ws.on('message', function (data, isBinary) {
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
  });
});
lpinca commented 6 months ago

Try with brew install openssl@3.

Just change your benchmark to do multiple sends in same tick:

Yes, that breaks the benchmark anyway.

FWIW I'm posting a patch that also passes the existing tests

diff --git a/doc/ws.md b/doc/ws.md
index f79cfc9..c30f3bc 100644
--- a/doc/ws.md
+++ b/doc/ws.md
@@ -476,11 +476,8 @@ of binary protocols transferring large messages with multiple fragments.
 - {Number}

 The number of bytes of data that have been queued using calls to `send()` but
-not yet transmitted to the network. This deviates from the HTML standard in the
-following ways:
-
-1. If the data is immediately sent the value is `0`.
-1. All framing bytes are included.
+not yet transmitted to the network. This deviates from the WHATWG standard in
+that all framing bytes are included.

 ### websocket.close([code[, reason]])

diff --git a/lib/sender.js b/lib/sender.js
index 1ed04b0..aafed6b 100644
--- a/lib/sender.js
+++ b/lib/sender.js
@@ -34,6 +34,7 @@ class Sender {
     }

     this._socket = socket;
+    this._corked = false;

     this._firstFragment = true;
     this._compress = false;
@@ -463,11 +464,15 @@ class Sender {
    * @private
    */
   sendFrame(list, cb) {
-    if (list.length === 2) {
+    if (!this._corked) {
+      this._corked = true;
       this._socket.cork();
+      process.nextTick(uncork, this);
+    }
+
+    if (list.length === 2) {
       this._socket.write(list[0]);
       this._socket.write(list[1], cb);
-      this._socket.uncork();
     } else {
       this._socket.write(list[0], cb);
     }
@@ -475,3 +480,14 @@ class Sender {
 }

 module.exports = Sender;
+
+/**
+ * Uncorks a sender
+ *
+ * @param {Sender} sender The Sender instance
+ * @private
+ */
+function uncork(sender) {
+  sender._corked = false;
+  sender._socket.uncork();
+}
diff --git a/test/websocket.test.js b/test/websocket.test.js
index e1b3bd2..3733bfd 100644
--- a/test/websocket.test.js
+++ b/test/websocket.test.js
@@ -2080,7 +2080,7 @@ describe('WebSocket', () => {
       wss.on('connection', (ws) => {
         ws.close();

-        assert.strictEqual(ws.bufferedAmount, 0);
+        assert.strictEqual(ws.bufferedAmount, 2);

         ws.ping('hi', (err) => {
           assert.ok(err instanceof Error);
@@ -2249,7 +2249,7 @@ describe('WebSocket', () => {
       wss.on('connection', (ws) => {
         ws.close();

-        assert.strictEqual(ws.bufferedAmount, 0);
+        assert.strictEqual(ws.bufferedAmount, 2);

         ws.pong('hi', (err) => {
           assert.ok(err instanceof Error);
@@ -2493,7 +2493,7 @@ describe('WebSocket', () => {
       wss.on('connection', (ws) => {
         ws.close();

-        assert.strictEqual(ws.bufferedAmount, 0);
+        assert.strictEqual(ws.bufferedAmount, 2);

         ws.send('hi', (err) => {
           assert.ok(err instanceof Error);

WRT to the missing callbacks, they are called with a plain Writable

const { Writable } = require('stream');

const writable = new Writable({
  write(chunk, encoding, callback) {
    callback();
  },
  writev(chunks, callback) {
    callback();
  }
});

writable.cork();

writable.write('foo', function (err) {
  console.log('foo callback');
  console.error(err);
});
writable.write('bar', function (err) {
  console.log('bar callback');
  console.error(err);
});

writable.destroy();

process.nextTick(function () {
  writable.uncork();
});

but the raised error

Error [ERR_STREAM_DESTROYED]: Cannot call write after a stream was destroyed
    at errorBuffer (node:internal/streams/writable:724:33)
    at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
  code: 'ERR_STREAM_DESTROYED'
}

would be misleading for the user as websocket.send() would be called before the socket is destroyed.

lpinca commented 6 months ago

That would be trivial to detect and provide a more useful error?

I don't know, I did not investigate. Currently, the user provided callback is passes as is to socket.write(). Anyway, https://github.com/websockets/ws/pull/2214#issuecomment-2029338838 should be solved first.