Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: cork socket for a micro task #2214

Closed
wants to merge 2 commits into from
Closed

Conversation

ronag
Copy link
Contributor

@ronag ronag commented Apr 1, 2024

Significantly improves performance when writing a lot of messages.

@ronag ronag force-pushed the master branch 3 times, most recently from 89790d5 to dea76c7 Compare April 1, 2024 05:44
lib/sender.js Outdated Show resolved Hide resolved
@lpinca
Copy link
Member

lpinca commented Apr 1, 2024

Can you show some numbers? I would prefer not to make it the default. See also #1999 and #1797.

@ronag
Copy link
Contributor Author

ronag commented Apr 1, 2024

Given your response on 1797 I don't think you will agree with me.

I can monkey patch this so it's fine.

@ronag ronag closed this Apr 1, 2024
@lpinca
Copy link
Member

lpinca commented Apr 1, 2024

This also introduces a behavior difference (probably irrelevant but needs more thoughts) if websocket.terminate() is called in the same tick after websocket.send(). The callback of socket.write() is not called because the socket is destroyed while it is corked.

@ronag
Copy link
Contributor Author

ronag commented Apr 1, 2024

if websocket.terminate() is called in the same tick after websocket.send(). The callback of socket.write() is not called because the socket is destroyed while it is corked.

The callback should be called with an error? I do don't think it should be a problem. Don't you have tests for that?

@lpinca
Copy link
Member

lpinca commented Apr 1, 2024

I think so but it should be addressed in Node.js core.

const net = require('net');

const server = net.createServer();

server.on('connection', function (socket) {
  socket.resume();
  socket.cork();

  socket.write('chunk1', function (err) {
    console.error(err);
  });
  socket.write('chunk2', function (err) {
    console.error(err);
  });

  process.nextTick(function () {
    console.log('uncorked');
    socket.uncork();
  });

  socket.destroy();
});

server.listen(0, function () {
  const socket = net.createConnection({
    port: server.address().port
  });

  socket.on('data', function (chunk) {
    console.log(chunk.toString());
  });
});

@lpinca
Copy link
Member

lpinca commented Apr 1, 2024

Here is benchmark run with an echo server.

import { WebSocketServer } from 'ws';

const server = new WebSocketServer(
  {
    allowSynchronousEvents: true,
    port: 8080
  },
  function () {
    console.log('Server listening on *:8080');
  }
);

server.on('connection', function (ws) {
  ws.on('message', function (data, isBinary) {
    ws.send(data, { binary: isBinary });
  });
});

Without this patch

$ ./load_test 32 127.0.0.1 8080 0 0 125
Using message size of 125 bytes
Running benchmark now...
Msg/sec: 81414.000000
Msg/sec: 83492.500000
Msg/sec: 82391.500000
Msg/sec: 81919.000000
Msg/sec: 84937.500000
Msg/sec: 86152.250000
Msg/sec: 86007.750000
Msg/sec: 85930.250000

With this patch

$ ./load_test 32 127.0.0.1 8080 0 0 125
Using message size of 125 bytes
Running benchmark now...
Msg/sec: 83286.250000
Msg/sec: 85554.500000
Msg/sec: 85760.250000
Msg/sec: 85684.750000
Msg/sec: 84287.000000
Msg/sec: 83487.750000
Msg/sec: 84072.750000
Msg/sec: 84051.000000

load_test is taken from https://github.com/uNetworking/uWebSockets/blob/master/benchmarks/load_test.c

It confirms my thoughts on #1797. I also prefer websocket.send() to closely mimic socket.write() without corking by default.

@ronag
Copy link
Contributor Author

ronag commented Apr 1, 2024

That benchmark totally misses the point. This change has been a huge performance boost for us in production.

@ronag
Copy link
Contributor Author

ronag commented Apr 1, 2024

Don't worry about this PR. I thought it was a no brainer and don't have the bandwidth to convince you if you think otherwise.

@ronag
Copy link
Contributor Author

ronag commented Apr 1, 2024

I'll have a look at the node core thing. As far as I know if should work if you destroy the socket with an error during terminate.

@lpinca
Copy link
Member

lpinca commented Apr 1, 2024

As far as I know if should work if you destroy the socket with an error during terminate.

An 'error' event is emitted on the socket, but the callbacks for corked writes are still not called. It is reproducible by calling socket.destroy(new Error('err')) here #2214 (comment).

How much is performance improved in your case and why is websocket.send() called several times in the same tick?

@ronag
Copy link
Contributor Author

ronag commented Apr 1, 2024

How much is performance improved in your case and why is websocket.send() called several times in the same tick?

I don't have hard numbers. It went from being the primary bottleneck in our profiling to almost negligible. I can't run your benchmark since I don't have load_test.

@ronag
Copy link
Contributor Author

ronag commented Apr 1, 2024

An 'error' event is emitted on the socket, but the callbacks for corked writes are still not called. It is reproducible by calling socket.destroy(new Error('err')) here #2214 (comment).

I will have a look.

@lpinca
Copy link
Member

lpinca commented Apr 1, 2024

I can't run your benchmark since I don't have load_test

git clone https://github.com/uNetworking/uWebSockets.git
cd uWebSockets
git submodule init
git submodule update uSockets 
cd benchmarks
make

If you are on macOS and the build fails, this patch works

diff --git a/benchmarks/Makefile b/benchmarks/Makefile
index 9275d91..f498072 100644
--- a/benchmarks/Makefile
+++ b/benchmarks/Makefile
@@ -1,5 +1,5 @@
 default:
-	g++ -flto -march=native parser.cpp -O3 -I../uSockets/src -o parser
+# 	g++ -flto -march=native parser.cpp -O3 -I../uSockets/src -o parser
 	clang -flto -O3 -DLIBUS_USE_OPENSSL -I../uSockets/src ../uSockets/src/*.c ../uSockets/src/eventing/*.c ../uSockets/src/crypto/*.c broadcast_test.c load_test.c scale_test.c -c
 	clang++ -flto -O3 -DLIBUS_USE_OPENSSL -I../uSockets/src ../uSockets/src/crypto/*.cpp -c -std=c++17
 	clang++ -flto -O3 -DLIBUS_USE_OPENSSL `ls *.o | grep -Ev "load_test|scale_test"` -lssl -lcrypto -o broadcast_test

@ronag
Copy link
Contributor Author

ronag commented Apr 1, 2024

Yea, it's missing openssl/ssl.h and I don't know how to figure that out. brew install openssl is insufficient.

Just change your benchmark to do multiple sends in same tick:

import { WebSocketServer } from 'ws';

const server = new WebSocketServer(
  {
    allowSynchronousEvents: true,
    port: 8080
  },
  function () {
    console.log('Server listening on *:8080');
  }
);

server.on('connection', function (ws) {
  ws.on('message', function (data, isBinary) {
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
    ws.send(data, { binary: isBinary });
  });
});

@lpinca
Copy link
Member

lpinca commented Apr 1, 2024

Try with brew install openssl@3.

Just change your benchmark to do multiple sends in same tick:

Yes, that breaks the benchmark anyway.

FWIW I'm posting a patch that also passes the existing tests

diff --git a/doc/ws.md b/doc/ws.md
index f79cfc9..c30f3bc 100644
--- a/doc/ws.md
+++ b/doc/ws.md
@@ -476,11 +476,8 @@ of binary protocols transferring large messages with multiple fragments.
 - {Number}
 
 The number of bytes of data that have been queued using calls to `send()` but
-not yet transmitted to the network. This deviates from the HTML standard in the
-following ways:
-
-1. If the data is immediately sent the value is `0`.
-1. All framing bytes are included.
+not yet transmitted to the network. This deviates from the WHATWG standard in
+that all framing bytes are included.
 
 ### websocket.close([code[, reason]])
 
diff --git a/lib/sender.js b/lib/sender.js
index 1ed04b0..aafed6b 100644
--- a/lib/sender.js
+++ b/lib/sender.js
@@ -34,6 +34,7 @@ class Sender {
     }
 
     this._socket = socket;
+    this._corked = false;
 
     this._firstFragment = true;
     this._compress = false;
@@ -463,11 +464,15 @@ class Sender {
    * @private
    */
   sendFrame(list, cb) {
-    if (list.length === 2) {
+    if (!this._corked) {
+      this._corked = true;
       this._socket.cork();
+      process.nextTick(uncork, this);
+    }
+
+    if (list.length === 2) {
       this._socket.write(list[0]);
       this._socket.write(list[1], cb);
-      this._socket.uncork();
     } else {
       this._socket.write(list[0], cb);
     }
@@ -475,3 +480,14 @@ class Sender {
 }
 
 module.exports = Sender;
+
+/**
+ * Uncorks a sender
+ *
+ * @param {Sender} sender The Sender instance
+ * @private
+ */
+function uncork(sender) {
+  sender._corked = false;
+  sender._socket.uncork();
+}
diff --git a/test/websocket.test.js b/test/websocket.test.js
index e1b3bd2..3733bfd 100644
--- a/test/websocket.test.js
+++ b/test/websocket.test.js
@@ -2080,7 +2080,7 @@ describe('WebSocket', () => {
       wss.on('connection', (ws) => {
         ws.close();
 
-        assert.strictEqual(ws.bufferedAmount, 0);
+        assert.strictEqual(ws.bufferedAmount, 2);
 
         ws.ping('hi', (err) => {
           assert.ok(err instanceof Error);
@@ -2249,7 +2249,7 @@ describe('WebSocket', () => {
       wss.on('connection', (ws) => {
         ws.close();
 
-        assert.strictEqual(ws.bufferedAmount, 0);
+        assert.strictEqual(ws.bufferedAmount, 2);
 
         ws.pong('hi', (err) => {
           assert.ok(err instanceof Error);
@@ -2493,7 +2493,7 @@ describe('WebSocket', () => {
       wss.on('connection', (ws) => {
         ws.close();
 
-        assert.strictEqual(ws.bufferedAmount, 0);
+        assert.strictEqual(ws.bufferedAmount, 2);
 
         ws.send('hi', (err) => {
           assert.ok(err instanceof Error);

WRT to the missing callbacks, they are called with a plain Writable

const { Writable } = require('stream');

const writable = new Writable({
  write(chunk, encoding, callback) {
    callback();
  },
  writev(chunks, callback) {
    callback();
  }
});

writable.cork();

writable.write('foo', function (err) {
  console.log('foo callback');
  console.error(err);
});
writable.write('bar', function (err) {
  console.log('bar callback');
  console.error(err);
});

writable.destroy();

process.nextTick(function () {
  writable.uncork();
});

but the raised error

Error [ERR_STREAM_DESTROYED]: Cannot call write after a stream was destroyed
    at errorBuffer (node:internal/streams/writable:724:33)
    at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
  code: 'ERR_STREAM_DESTROYED'
}

would be misleading for the user as websocket.send() would be called before the socket is destroyed.

@lpinca
Copy link
Member

lpinca commented Apr 1, 2024

That would be trivial to detect and provide a more useful error?

I don't know, I did not investigate. Currently, the user provided callback is passes as is to socket.write(). Anyway, #2214 (comment) should be solved first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants