From d160344a6bfea16c8796423f1b2619476e64afef Mon Sep 17 00:00:00 2001 From: Brandon Fulljames Date: Tue, 23 Oct 2018 01:53:17 -0700 Subject: [PATCH 01/13] Do not waste time trying to compress when socket is closed --- lib/sender.js | 15 +++++++++ test/sender.test.js | 78 +++++++++++++++++++++++++++++++++++++++------ 2 files changed, 83 insertions(+), 10 deletions(-) diff --git a/lib/sender.js b/lib/sender.js index 060e55392..f630c9968 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -27,6 +27,20 @@ class Sender { this._bufferedBytes = 0; this._deflating = false; this._queue = []; + + this._socket.on('close', () => { + const err = new Error( + `WebSocket is not open: readyState ${this._socket.readyState} ` + ); + + while (this._queue.length) { + const params = this._queue.shift(); + + this._bufferedBytes -= params[1].length; + + params[params.length - 1](err); + } + }); } /** @@ -349,6 +363,7 @@ class Sender { const params = this._queue.shift(); this._bufferedBytes -= params[1].length; + params[0].apply(this, params.slice(1)); } } diff --git a/test/sender.test.js b/test/sender.test.js index 623ce5954..4857d1cc3 100644 --- a/test/sender.test.js +++ b/test/sender.test.js @@ -42,7 +42,8 @@ describe('Sender', function () { write: (data) => { assert.strictEqual(data[0] & 0x40, 0x40); if (++count === 3) done(); - } + }, + on: () => {} }, { 'permessage-deflate': perMessageDeflate }); @@ -57,13 +58,62 @@ describe('Sender', function () { sender.send('hi', options); }); + it('does not attempt to compress enqueued messages after socket closes', function (done) { + const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); + const numMessages = 1000; + let numWritten = 0; + const sender = new Sender({ + write: (data) => { + assert.strictEqual(data[0] & 0x40, 0x40); + if (++numWritten > 1) done(new Error('Too many attempted writes')); + }, + on: (ev, cb) => { + if (ev === 'close') { + process.nextTick(cb); + } + } + }, { + 'permessage-deflate': perMessageDeflate + }); + + let numCompressed = 0; + + perMessageDeflate.compress = (data, fin, cb) => { + if (++numCompressed > 1) { + done(new Error('Compressed too many times')); + } + + setTimeout(() => cb(null, data), 1); + }; + + perMessageDeflate.accept([{}]); + + const options = { compress: true, fin: false }; + const array = new Uint8Array([0x68, 0x69]); + + sender.send(array.buffer, options, () => {}); + sender.send(array, options, () => {}); + + let numErrors = 0; + for (let i = 0; i < numMessages; ++i) { + sender.send('hi', options, (err) => { + if (!err) return; + + if (++numErrors === numMessages) { + done(); + } + }); + } + }); + it('does not compress data for small payloads', function (done) { const perMessageDeflate = new PerMessageDeflate(); const sender = new Sender({ write: (data) => { assert.notStrictEqual(data[0] & 0x40, 0x40); done(); - } + }, + on: () => {} }, { 'permessage-deflate': perMessageDeflate }); @@ -86,7 +136,8 @@ describe('Sender', function () { assert.strictEqual(fragments[1][0] & 0x40, 0x00); assert.strictEqual(fragments[1].length, 6); done(); - } + }, + on: () => {} }, { 'permessage-deflate': perMessageDeflate }); @@ -110,7 +161,8 @@ describe('Sender', function () { assert.strictEqual(fragments[1][0] & 0x40, 0x00); assert.strictEqual(fragments[1].length, 5); done(); - } + }, + on: () => {} }, { 'permessage-deflate': perMessageDeflate }); @@ -134,7 +186,8 @@ describe('Sender', function () { assert.strictEqual(fragments[1][0] & 0x40, 0x00); assert.strictEqual(fragments[1].length, 8); done(); - } + }, + on: () => {} }, { 'permessage-deflate': perMessageDeflate }); @@ -158,7 +211,8 @@ describe('Sender', function () { assert.strictEqual(fragments[1][0] & 0x40, 0x00); assert.strictEqual(fragments[1].length, 3); done(); - } + }, + on: () => {} }, { 'permessage-deflate': perMessageDeflate }); @@ -175,7 +229,8 @@ describe('Sender', function () { const sender = new Sender({ write: () => { if (++count > 1e4) done(); - } + }, + on: () => {} }, { 'permessage-deflate': perMessageDeflate }); @@ -202,7 +257,8 @@ describe('Sender', function () { assert.ok(data.equals(Buffer.from([0x89, 0x02, 0x68, 0x69]))); if (count === 4) done(); - } + }, + on: () => {} }, { 'permessage-deflate': perMessageDeflate }); @@ -228,7 +284,8 @@ describe('Sender', function () { assert.ok(data.equals(Buffer.from([0x8a, 0x02, 0x68, 0x69]))); if (count === 4) done(); - } + }, + on: () => {} }, { 'permessage-deflate': perMessageDeflate }); @@ -253,7 +310,8 @@ describe('Sender', function () { write: (data, cb) => { count++; if (cb) cb(); - } + }, + on: () => {} }, { 'permessage-deflate': perMessageDeflate }); From b265319131847075332286a4c3e33e360dea5600 Mon Sep 17 00:00:00 2001 From: Brandon Fulljames Date: Tue, 23 Oct 2018 01:54:52 -0700 Subject: [PATCH 02/13] Remove errant extra new line --- lib/sender.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/sender.js b/lib/sender.js index f630c9968..3bd698b4e 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -363,7 +363,6 @@ class Sender { const params = this._queue.shift(); this._bufferedBytes -= params[1].length; - params[0].apply(this, params.slice(1)); } } From d75800e7a691ac01cd3b2fcb3dc2a500179c5f07 Mon Sep 17 00:00:00 2001 From: Luigi Pinca Date: Tue, 23 Oct 2018 14:49:50 -0700 Subject: [PATCH 03/13] Use once instead of on Co-Authored-By: Evertras --- lib/sender.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sender.js b/lib/sender.js index 3bd698b4e..e6bc330be 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -28,7 +28,7 @@ class Sender { this._deflating = false; this._queue = []; - this._socket.on('close', () => { + this._socket.once('close', () => { const err = new Error( `WebSocket is not open: readyState ${this._socket.readyState} ` ); From c936a4cc835e4bfeae27359db9dcbe2b71cb65f7 Mon Sep 17 00:00:00 2001 From: Brandon Fulljames Date: Tue, 23 Oct 2018 15:26:17 -0700 Subject: [PATCH 04/13] Update sender tests to use a simple mock socket --- test/sender.test.js | 99 +++++++++++++++++++++------------------------ 1 file changed, 45 insertions(+), 54 deletions(-) diff --git a/test/sender.test.js b/test/sender.test.js index 4857d1cc3..eef542dff 100644 --- a/test/sender.test.js +++ b/test/sender.test.js @@ -5,6 +5,18 @@ const assert = require('assert'); const PerMessageDeflate = require('../lib/permessage-deflate'); const Sender = require('../lib/sender'); +class MockSocket { + constructor ({ write, on, once } = {}) { + if (write) this.write = write; + if (on) this.on = on; + if (once) this.once = once; + } + + write () {} + on () {} + once () {} +} + describe('Sender', function () { describe('.frame', function () { it('does not mutate the input buffer if data is `readOnly`', function () { @@ -38,15 +50,13 @@ describe('Sender', function () { it('compresses data if compress option is enabled', function (done) { const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); let count = 0; - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { assert.strictEqual(data[0] & 0x40, 0x40); if (++count === 3) done(); - }, - on: () => {} - }, { - 'permessage-deflate': perMessageDeflate + } }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); perMessageDeflate.accept([{}]); @@ -62,19 +72,18 @@ describe('Sender', function () { const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); const numMessages = 1000; let numWritten = 0; - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { assert.strictEqual(data[0] & 0x40, 0x40); if (++numWritten > 1) done(new Error('Too many attempted writes')); }, - on: (ev, cb) => { + once: (ev, cb) => { if (ev === 'close') { process.nextTick(cb); } } - }, { - 'permessage-deflate': perMessageDeflate }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); let numCompressed = 0; @@ -108,15 +117,13 @@ describe('Sender', function () { it('does not compress data for small payloads', function (done) { const perMessageDeflate = new PerMessageDeflate(); - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { assert.notStrictEqual(data[0] & 0x40, 0x40); done(); - }, - on: () => {} - }, { - 'permessage-deflate': perMessageDeflate + } }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); perMessageDeflate.accept([{}]); @@ -126,7 +133,7 @@ describe('Sender', function () { it('compresses all frames in a fragmented message', function (done) { const fragments = []; const perMessageDeflate = new PerMessageDeflate({ threshold: 3 }); - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { fragments.push(data); if (fragments.length !== 2) return; @@ -136,11 +143,9 @@ describe('Sender', function () { assert.strictEqual(fragments[1][0] & 0x40, 0x00); assert.strictEqual(fragments[1].length, 6); done(); - }, - on: () => {} - }, { - 'permessage-deflate': perMessageDeflate + } }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); perMessageDeflate.accept([{}]); @@ -151,7 +156,7 @@ describe('Sender', function () { it('compresses no frames in a fragmented message', function (done) { const fragments = []; const perMessageDeflate = new PerMessageDeflate({ threshold: 3 }); - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { fragments.push(data); if (fragments.length !== 2) return; @@ -161,11 +166,9 @@ describe('Sender', function () { assert.strictEqual(fragments[1][0] & 0x40, 0x00); assert.strictEqual(fragments[1].length, 5); done(); - }, - on: () => {} - }, { - 'permessage-deflate': perMessageDeflate + } }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); perMessageDeflate.accept([{}]); @@ -176,7 +179,7 @@ describe('Sender', function () { it('compresses empty buffer as first fragment', function (done) { const fragments = []; const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { fragments.push(data); if (fragments.length !== 2) return; @@ -186,11 +189,9 @@ describe('Sender', function () { assert.strictEqual(fragments[1][0] & 0x40, 0x00); assert.strictEqual(fragments[1].length, 8); done(); - }, - on: () => {} - }, { - 'permessage-deflate': perMessageDeflate + } }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); perMessageDeflate.accept([{}]); @@ -201,7 +202,7 @@ describe('Sender', function () { it('compresses empty buffer as last fragment', function (done) { const fragments = []; const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { fragments.push(data); if (fragments.length !== 2) return; @@ -211,11 +212,9 @@ describe('Sender', function () { assert.strictEqual(fragments[1][0] & 0x40, 0x00); assert.strictEqual(fragments[1].length, 3); done(); - }, - on: () => {} - }, { - 'permessage-deflate': perMessageDeflate + } }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); perMessageDeflate.accept([{}]); @@ -226,14 +225,12 @@ describe('Sender', function () { it('handles many send calls while processing without crashing on flush', function (done) { let count = 0; const perMessageDeflate = new PerMessageDeflate(); - const sender = new Sender({ + const mockSocket = new MockSocket({ write: () => { if (++count > 1e4) done(); - }, - on: () => {} - }, { - 'permessage-deflate': perMessageDeflate + } }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); perMessageDeflate.accept([{}]); @@ -251,17 +248,15 @@ describe('Sender', function () { it('works with multiple types of data', function (done) { const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); let count = 0; - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { if (++count === 1) return; assert.ok(data.equals(Buffer.from([0x89, 0x02, 0x68, 0x69]))); if (count === 4) done(); - }, - on: () => {} - }, { - 'permessage-deflate': perMessageDeflate + } }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); perMessageDeflate.accept([{}]); @@ -278,17 +273,15 @@ describe('Sender', function () { it('works with multiple types of data', function (done) { const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); let count = 0; - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { if (++count === 1) return; assert.ok(data.equals(Buffer.from([0x8a, 0x02, 0x68, 0x69]))); if (count === 4) done(); - }, - on: () => {} - }, { - 'permessage-deflate': perMessageDeflate + } }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); perMessageDeflate.accept([{}]); @@ -306,15 +299,13 @@ describe('Sender', function () { const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); let count = 0; - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data, cb) => { count++; if (cb) cb(); - }, - on: () => {} - }, { - 'permessage-deflate': perMessageDeflate + } }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); perMessageDeflate.accept([{}]); From d7805716e3ac683ecfed0cb7a60a20ddace0506f Mon Sep 17 00:00:00 2001 From: Brandon Fulljames Date: Tue, 23 Oct 2018 15:28:43 -0700 Subject: [PATCH 05/13] Check if callback is actually a function before calling it --- lib/sender.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/sender.js b/lib/sender.js index e6bc330be..0b462adc8 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -38,7 +38,11 @@ class Sender { this._bufferedBytes -= params[1].length; - params[params.length - 1](err); + const cb = params[params.length - 1]; + + if (typeof cb === 'function') { + cb(err); + } } }); } From 0444d5c977359002a17ecc106f1eb8937983cf9f Mon Sep 17 00:00:00 2001 From: Brandon Fulljames Date: Wed, 24 Oct 2018 22:46:16 -0700 Subject: [PATCH 06/13] Add tests to ensure erroring callbacks are in expected order with expected state --- lib/sender.js | 2 +- test/sender.test.js | 6 ++++-- test/websocket.test.js | 25 +++++++++++++++++++++++++ 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/lib/sender.js b/lib/sender.js index 0b462adc8..6e35bb8f8 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -28,7 +28,7 @@ class Sender { this._deflating = false; this._queue = []; - this._socket.once('close', () => { + this._socket.prependOnceListener('close', () => { const err = new Error( `WebSocket is not open: readyState ${this._socket.readyState} ` ); diff --git a/test/sender.test.js b/test/sender.test.js index eef542dff..01e8aa389 100644 --- a/test/sender.test.js +++ b/test/sender.test.js @@ -6,15 +6,17 @@ const PerMessageDeflate = require('../lib/permessage-deflate'); const Sender = require('../lib/sender'); class MockSocket { - constructor ({ write, on, once } = {}) { + constructor ({ write, on, once, prependOnceListener } = {}) { if (write) this.write = write; if (on) this.on = on; if (once) this.once = once; + if (prependOnceListener) this.prependOnceListener = prependOnceListener; } write () {} on () {} once () {} + prependOnceListener () {} } describe('Sender', function () { @@ -77,7 +79,7 @@ describe('Sender', function () { assert.strictEqual(data[0] & 0x40, 0x40); if (++numWritten > 1) done(new Error('Too many attempted writes')); }, - once: (ev, cb) => { + prependOnceListener: (ev, cb) => { if (ev === 'close') { process.nextTick(cb); } diff --git a/test/websocket.test.js b/test/websocket.test.js index 25bdc8f8e..8deb6646c 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -2040,6 +2040,31 @@ describe('WebSocket', function () { }); }); + it('reports the web socket as CLOSING in error callbacks when connection is terminated abnormally', function (done) { + const wss = new WebSocket.Server({ + perMessageDeflate: { threshold: 0 }, + port: 0 + }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`, { + perMessageDeflate: { threshold: 0 } }); + const messages = []; + + ws.on('message', (message) => messages.push(message)); + ws.on('close', (code) => { + assert.strictEqual(code, 1006); + assert.deepStrictEqual(messages, ['qux']); + wss.close(done); + }); + }); + + wss.on('connection', (ws) => { + ws.send('qux', () => ws._socket.end()); + ws.send('foo', () => assert.strictEqual(ws.readyState, WebSocket.CLOSING)); + ws.send('bar'); + ws.send('baz'); + }); + }); + describe('#send', function () { it('ignores the `compress` option if the extension is disabled', function (done) { const wss = new WebSocket.Server({ port: 0 }, () => { From e2f4342e64ca6f507077934d57145a884b931db1 Mon Sep 17 00:00:00 2001 From: Brandon Fulljames Date: Wed, 24 Oct 2018 23:24:39 -0700 Subject: [PATCH 07/13] Better test for CLOSING state on buffered compression attempts --- test/websocket.test.js | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/test/websocket.test.js b/test/websocket.test.js index 8deb6646c..bc1c75dbb 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -2041,6 +2041,7 @@ describe('WebSocket', function () { }); it('reports the web socket as CLOSING in error callbacks when connection is terminated abnormally', function (done) { + console.log('================================================= START'); const wss = new WebSocket.Server({ perMessageDeflate: { threshold: 0 }, port: 0 @@ -2051,17 +2052,23 @@ describe('WebSocket', function () { ws.on('message', (message) => messages.push(message)); ws.on('close', (code) => { + console.log('closing ws'); assert.strictEqual(code, 1006); - assert.deepStrictEqual(messages, ['qux']); - wss.close(done); + assert.deepStrictEqual(messages, []); + done(); }); }); wss.on('connection', (ws) => { - ws.send('qux', () => ws._socket.end()); - ws.send('foo', () => assert.strictEqual(ws.readyState, WebSocket.CLOSING)); - ws.send('bar'); - ws.send('baz'); + let checkState = () => { + if (ws._sender._queue.length) { + assert.strictEqual(ws.readyState, WebSocket.CLOSING); + } + }; + for (let i = 0; i < 1000; ++i) { + ws.send('foo', { compress: true }, checkState); + } + wss.close(); }); }); From edda00cf99b60cc57cea700043db328fe2a77acb Mon Sep 17 00:00:00 2001 From: Brandon Fulljames Date: Wed, 24 Oct 2018 23:30:32 -0700 Subject: [PATCH 08/13] Remove errant console log --- test/websocket.test.js | 1 - 1 file changed, 1 deletion(-) diff --git a/test/websocket.test.js b/test/websocket.test.js index bc1c75dbb..abdc1fc58 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -2041,7 +2041,6 @@ describe('WebSocket', function () { }); it('reports the web socket as CLOSING in error callbacks when connection is terminated abnormally', function (done) { - console.log('================================================= START'); const wss = new WebSocket.Server({ perMessageDeflate: { threshold: 0 }, port: 0 From 88c5474f55384e5b3a1e4cf6109585eb5755b8f8 Mon Sep 17 00:00:00 2001 From: Brandon Fulljames Date: Wed, 24 Oct 2018 23:50:16 -0700 Subject: [PATCH 09/13] Switch back to once instead of prepend, add comment for clarity instead --- lib/sender.js | 2 +- lib/websocket.js | 2 ++ test/sender.test.js | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/sender.js b/lib/sender.js index 6e35bb8f8..0b462adc8 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -28,7 +28,7 @@ class Sender { this._deflating = false; this._queue = []; - this._socket.prependOnceListener('close', () => { + this._socket.once('close', () => { const err = new Error( `WebSocket is not open: readyState ${this._socket.readyState} ` ); diff --git a/lib/websocket.js b/lib/websocket.js index 92cb0f891..21c2f23ab 100644 --- a/lib/websocket.js +++ b/lib/websocket.js @@ -123,6 +123,8 @@ class WebSocket extends EventEmitter { maxPayload ); + // Note that we must instantiate the Sender before we add socket.on('close') below + // because we want the Sender to empty its queue before sending our own close event. this._sender = new Sender(socket, this._extensions); this._receiver = receiver; this._socket = socket; diff --git a/test/sender.test.js b/test/sender.test.js index 01e8aa389..13f20d048 100644 --- a/test/sender.test.js +++ b/test/sender.test.js @@ -79,7 +79,7 @@ describe('Sender', function () { assert.strictEqual(data[0] & 0x40, 0x40); if (++numWritten > 1) done(new Error('Too many attempted writes')); }, - prependOnceListener: (ev, cb) => { + once: (ev, cb) => { if (ev === 'close') { process.nextTick(cb); } From 35105d28a85ffaf7b8b9b77e888e026404e88247 Mon Sep 17 00:00:00 2001 From: Brandon Fulljames Date: Wed, 24 Oct 2018 23:53:10 -0700 Subject: [PATCH 10/13] Fix error message for queued sender callbacks --- lib/sender.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sender.js b/lib/sender.js index 0b462adc8..b43b60b31 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -30,7 +30,7 @@ class Sender { this._socket.once('close', () => { const err = new Error( - `WebSocket is not open: readyState ${this._socket.readyState} ` + `WebSocket is not open: readyState CLOSING` ); while (this._queue.length) { From 3303832dc49e73d4f5e49400c67d5d9134749b78 Mon Sep 17 00:00:00 2001 From: Luigi Pinca Date: Wed, 24 Oct 2018 23:57:11 -0700 Subject: [PATCH 11/13] Update lib/sender.js Co-Authored-By: Evertras --- lib/sender.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sender.js b/lib/sender.js index b43b60b31..3dc15b2eb 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -30,7 +30,7 @@ class Sender { this._socket.once('close', () => { const err = new Error( - `WebSocket is not open: readyState CLOSING` + `WebSocket is not open: readyState 2 (CLOSING)` ); while (this._queue.length) { From 20e9d857714932f670bf9e32ba2fa70d49ed501a Mon Sep 17 00:00:00 2001 From: Luigi Pinca Date: Mon, 29 Oct 2018 09:01:01 +0100 Subject: [PATCH 12/13] Add the listener only when permessage-deflate is enabled --- lib/sender.js | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/lib/sender.js b/lib/sender.js index 3dc15b2eb..500bd98cd 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -28,23 +28,19 @@ class Sender { this._deflating = false; this._queue = []; - this._socket.once('close', () => { - const err = new Error( - `WebSocket is not open: readyState 2 (CLOSING)` - ); + if (this._extensions[PerMessageDeflate.extensionName]) { + this._socket.once('close', () => { + const err = new Error('WebSocket is not open: readyState 2 (CLOSING)'); - while (this._queue.length) { - const params = this._queue.shift(); + while (this._queue.length) { + const params = this._queue.shift(); + const cb = params[params.length - 1]; - this._bufferedBytes -= params[1].length; - - const cb = params[params.length - 1]; - - if (typeof cb === 'function') { - cb(err); + this._bufferedBytes -= params[1].length; + if (typeof cb === 'function') cb(err); } - } - }); + }); + } } /** From dc906384484044069f18e9198d7b9a6afeaf3682 Mon Sep 17 00:00:00 2001 From: Luigi Pinca Date: Mon, 29 Oct 2018 13:52:33 +0100 Subject: [PATCH 13/13] Fix nits --- lib/websocket.js | 7 ++- test/sender.test.js | 99 ++++++++++++++++++++++-------------------- test/websocket.test.js | 43 +++++++++--------- 3 files changed, 81 insertions(+), 68 deletions(-) diff --git a/lib/websocket.js b/lib/websocket.js index 21c2f23ab..845bd2dba 100644 --- a/lib/websocket.js +++ b/lib/websocket.js @@ -123,8 +123,11 @@ class WebSocket extends EventEmitter { maxPayload ); - // Note that we must instantiate the Sender before we add socket.on('close') below - // because we want the Sender to empty its queue before sending our own close event. + // + // `Sender` must be instantiated before adding the `socketOnClose` listener. + // This allows the sender queue, when used, to be emptied before + // `socketOnClose` is called. + // this._sender = new Sender(socket, this._extensions); this._receiver = receiver; this._socket = socket; diff --git a/test/sender.test.js b/test/sender.test.js index 13f20d048..e7c7dd104 100644 --- a/test/sender.test.js +++ b/test/sender.test.js @@ -6,17 +6,13 @@ const PerMessageDeflate = require('../lib/permessage-deflate'); const Sender = require('../lib/sender'); class MockSocket { - constructor ({ write, on, once, prependOnceListener } = {}) { + constructor ({ write, once } = {}) { if (write) this.write = write; - if (on) this.on = on; if (once) this.once = once; - if (prependOnceListener) this.prependOnceListener = prependOnceListener; } write () {} - on () {} once () {} - prependOnceListener () {} } describe('Sender', function () { @@ -58,7 +54,9 @@ describe('Sender', function () { if (++count === 3) done(); } }); - const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); + const sender = new Sender(mockSocket, { + 'permessage-deflate': perMessageDeflate + }); perMessageDeflate.accept([{}]); @@ -70,49 +68,40 @@ describe('Sender', function () { sender.send('hi', options); }); - it('does not attempt to compress enqueued messages after socket closes', function (done) { - const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); - const numMessages = 1000; - let numWritten = 0; + it('does not compress enqueued messages after socket closes', function (done) { + const numMessages = 100; + let numErrors = 0; + const mockSocket = new MockSocket({ write: (data) => { - assert.strictEqual(data[0] & 0x40, 0x40); - if (++numWritten > 1) done(new Error('Too many attempted writes')); + // Test that `PerMessageDeflate#compress()` and `Socket#write()` is + // called only once (for the first message that is not queued). + assert.strictEqual(numErrors, numMessages); + done(); }, once: (ev, cb) => { - if (ev === 'close') { - process.nextTick(cb); - } + if (ev === 'close') process.nextTick(cb); } }); - const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); - - let numCompressed = 0; - - perMessageDeflate.compress = (data, fin, cb) => { - if (++numCompressed > 1) { - done(new Error('Compressed too many times')); - } - - setTimeout(() => cb(null, data), 1); - }; + const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); perMessageDeflate.accept([{}]); - const options = { compress: true, fin: false }; - const array = new Uint8Array([0x68, 0x69]); + const sender = new Sender(mockSocket, { + 'permessage-deflate': perMessageDeflate + }); - sender.send(array.buffer, options, () => {}); - sender.send(array, options, () => {}); + const options = { compress: true, fin: true }; + sender.send('hi', options); - let numErrors = 0; - for (let i = 0; i < numMessages; ++i) { + for (let i = 0; i < numMessages; i++) { sender.send('hi', options, (err) => { - if (!err) return; - - if (++numErrors === numMessages) { - done(); - } + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'WebSocket is not open: readyState 2 (CLOSING)' + ); + numErrors++; }); } }); @@ -125,7 +114,9 @@ describe('Sender', function () { done(); } }); - const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); + const sender = new Sender(mockSocket, { + 'permessage-deflate': perMessageDeflate + }); perMessageDeflate.accept([{}]); @@ -147,7 +138,9 @@ describe('Sender', function () { done(); } }); - const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); + const sender = new Sender(mockSocket, { + 'permessage-deflate': perMessageDeflate + }); perMessageDeflate.accept([{}]); @@ -170,7 +163,9 @@ describe('Sender', function () { done(); } }); - const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); + const sender = new Sender(mockSocket, { + 'permessage-deflate': perMessageDeflate + }); perMessageDeflate.accept([{}]); @@ -193,7 +188,9 @@ describe('Sender', function () { done(); } }); - const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); + const sender = new Sender(mockSocket, { + 'permessage-deflate': perMessageDeflate + }); perMessageDeflate.accept([{}]); @@ -216,7 +213,9 @@ describe('Sender', function () { done(); } }); - const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); + const sender = new Sender(mockSocket, { + 'permessage-deflate': perMessageDeflate + }); perMessageDeflate.accept([{}]); @@ -232,7 +231,9 @@ describe('Sender', function () { if (++count > 1e4) done(); } }); - const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); + const sender = new Sender(mockSocket, { + 'permessage-deflate': perMessageDeflate + }); perMessageDeflate.accept([{}]); @@ -258,7 +259,9 @@ describe('Sender', function () { if (count === 4) done(); } }); - const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); + const sender = new Sender(mockSocket, { + 'permessage-deflate': perMessageDeflate + }); perMessageDeflate.accept([{}]); @@ -283,7 +286,9 @@ describe('Sender', function () { if (count === 4) done(); } }); - const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); + const sender = new Sender(mockSocket, { + 'permessage-deflate': perMessageDeflate + }); perMessageDeflate.accept([{}]); @@ -307,7 +312,9 @@ describe('Sender', function () { if (cb) cb(); } }); - const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); + const sender = new Sender(mockSocket, { + 'permessage-deflate': perMessageDeflate + }); perMessageDeflate.accept([{}]); diff --git a/test/websocket.test.js b/test/websocket.test.js index abdc1fc58..b78429e1c 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -2040,33 +2040,36 @@ describe('WebSocket', function () { }); }); - it('reports the web socket as CLOSING in error callbacks when connection is terminated abnormally', function (done) { + it('reports the state as `CLOSING` in callbacks of discarded queued messages', function (done) { const wss = new WebSocket.Server({ perMessageDeflate: { threshold: 0 }, port: 0 }, () => { - const ws = new WebSocket(`ws://localhost:${wss.address().port}`, { - perMessageDeflate: { threshold: 0 } }); - const messages = []; - - ws.on('message', (message) => messages.push(message)); - ws.on('close', (code) => { - console.log('closing ws'); - assert.strictEqual(code, 1006); - assert.deepStrictEqual(messages, []); - done(); - }); + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); }); wss.on('connection', (ws) => { - let checkState = () => { - if (ws._sender._queue.length) { - assert.strictEqual(ws.readyState, WebSocket.CLOSING); - } - }; - for (let i = 0; i < 1000; ++i) { - ws.send('foo', { compress: true }, checkState); - } + const map = () => crypto.randomBytes(16); + let count = 100; + + Array.from({ length: count }).map(map).forEach((data, i) => { + ws.send(data, (err) => { + assert.ok(err instanceof Error); + + if (i > 0) { + // The first message is not queued and compression completes after + // the `'close'` event is emitted on the `net.Socket`. + assert.strictEqual(ws.readyState, WebSocket.CLOSING); + assert.strictEqual( + err.message, + 'WebSocket is not open: readyState 2 (CLOSING)' + ); + } + + if (--count === 0) done(); + }); + }); + wss.close(); }); });