diff --git a/lib/sender.js b/lib/sender.js index 060e55392..500bd98cd 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -27,6 +27,20 @@ class Sender { this._bufferedBytes = 0; this._deflating = false; this._queue = []; + + if (this._extensions[PerMessageDeflate.extensionName]) { + this._socket.once('close', () => { + const err = new Error('WebSocket is not open: readyState 2 (CLOSING)'); + + while (this._queue.length) { + const params = this._queue.shift(); + const cb = params[params.length - 1]; + + this._bufferedBytes -= params[1].length; + if (typeof cb === 'function') cb(err); + } + }); + } } /** diff --git a/lib/websocket.js b/lib/websocket.js index 92cb0f891..845bd2dba 100644 --- a/lib/websocket.js +++ b/lib/websocket.js @@ -123,6 +123,11 @@ class WebSocket extends EventEmitter { maxPayload ); + // + // `Sender` must be instantiated before adding the `socketOnClose` listener. + // This allows the sender queue, when used, to be emptied before + // `socketOnClose` is called. + // this._sender = new Sender(socket, this._extensions); this._receiver = receiver; this._socket = socket; diff --git a/test/sender.test.js b/test/sender.test.js index 623ce5954..e7c7dd104 100644 --- a/test/sender.test.js +++ b/test/sender.test.js @@ -5,6 +5,16 @@ const assert = require('assert'); const PerMessageDeflate = require('../lib/permessage-deflate'); const Sender = require('../lib/sender'); +class MockSocket { + constructor ({ write, once } = {}) { + if (write) this.write = write; + if (once) this.once = once; + } + + write () {} + once () {} +} + describe('Sender', function () { describe('.frame', function () { it('does not mutate the input buffer if data is `readOnly`', function () { @@ -38,12 +48,13 @@ describe('Sender', function () { it('compresses data if compress option is enabled', function (done) { const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); let count = 0; - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { assert.strictEqual(data[0] & 0x40, 0x40); if (++count === 3) done(); } - }, { + }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); @@ -57,14 +68,53 @@ describe('Sender', function () { sender.send('hi', options); }); + it('does not compress enqueued messages after socket closes', function (done) { + const numMessages = 100; + let numErrors = 0; + + const mockSocket = new MockSocket({ + write: (data) => { + // Test that `PerMessageDeflate#compress()` and `Socket#write()` is + // called only once (for the first message that is not queued). + assert.strictEqual(numErrors, numMessages); + done(); + }, + once: (ev, cb) => { + if (ev === 'close') process.nextTick(cb); + } + }); + + const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); + perMessageDeflate.accept([{}]); + + const sender = new Sender(mockSocket, { + 'permessage-deflate': perMessageDeflate + }); + + const options = { compress: true, fin: true }; + sender.send('hi', options); + + for (let i = 0; i < numMessages; i++) { + sender.send('hi', options, (err) => { + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'WebSocket is not open: readyState 2 (CLOSING)' + ); + numErrors++; + }); + } + }); + it('does not compress data for small payloads', function (done) { const perMessageDeflate = new PerMessageDeflate(); - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { assert.notStrictEqual(data[0] & 0x40, 0x40); done(); } - }, { + }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); @@ -76,7 +126,7 @@ describe('Sender', function () { it('compresses all frames in a fragmented message', function (done) { const fragments = []; const perMessageDeflate = new PerMessageDeflate({ threshold: 3 }); - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { fragments.push(data); if (fragments.length !== 2) return; @@ -87,7 +137,8 @@ describe('Sender', function () { assert.strictEqual(fragments[1].length, 6); done(); } - }, { + }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); @@ -100,7 +151,7 @@ describe('Sender', function () { it('compresses no frames in a fragmented message', function (done) { const fragments = []; const perMessageDeflate = new PerMessageDeflate({ threshold: 3 }); - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { fragments.push(data); if (fragments.length !== 2) return; @@ -111,7 +162,8 @@ describe('Sender', function () { assert.strictEqual(fragments[1].length, 5); done(); } - }, { + }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); @@ -124,7 +176,7 @@ describe('Sender', function () { it('compresses empty buffer as first fragment', function (done) { const fragments = []; const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { fragments.push(data); if (fragments.length !== 2) return; @@ -135,7 +187,8 @@ describe('Sender', function () { assert.strictEqual(fragments[1].length, 8); done(); } - }, { + }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); @@ -148,7 +201,7 @@ describe('Sender', function () { it('compresses empty buffer as last fragment', function (done) { const fragments = []; const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { fragments.push(data); if (fragments.length !== 2) return; @@ -159,7 +212,8 @@ describe('Sender', function () { assert.strictEqual(fragments[1].length, 3); done(); } - }, { + }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); @@ -172,11 +226,12 @@ describe('Sender', function () { it('handles many send calls while processing without crashing on flush', function (done) { let count = 0; const perMessageDeflate = new PerMessageDeflate(); - const sender = new Sender({ + const mockSocket = new MockSocket({ write: () => { if (++count > 1e4) done(); } - }, { + }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); @@ -196,14 +251,15 @@ describe('Sender', function () { it('works with multiple types of data', function (done) { const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); let count = 0; - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { if (++count === 1) return; assert.ok(data.equals(Buffer.from([0x89, 0x02, 0x68, 0x69]))); if (count === 4) done(); } - }, { + }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); @@ -222,14 +278,15 @@ describe('Sender', function () { it('works with multiple types of data', function (done) { const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); let count = 0; - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data) => { if (++count === 1) return; assert.ok(data.equals(Buffer.from([0x8a, 0x02, 0x68, 0x69]))); if (count === 4) done(); } - }, { + }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); @@ -249,12 +306,13 @@ describe('Sender', function () { const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); let count = 0; - const sender = new Sender({ + const mockSocket = new MockSocket({ write: (data, cb) => { count++; if (cb) cb(); } - }, { + }); + const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); diff --git a/test/websocket.test.js b/test/websocket.test.js index 25bdc8f8e..b78429e1c 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -2040,6 +2040,40 @@ describe('WebSocket', function () { }); }); + it('reports the state as `CLOSING` in callbacks of discarded queued messages', function (done) { + const wss = new WebSocket.Server({ + perMessageDeflate: { threshold: 0 }, + port: 0 + }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + }); + + wss.on('connection', (ws) => { + const map = () => crypto.randomBytes(16); + let count = 100; + + Array.from({ length: count }).map(map).forEach((data, i) => { + ws.send(data, (err) => { + assert.ok(err instanceof Error); + + if (i > 0) { + // The first message is not queued and compression completes after + // the `'close'` event is emitted on the `net.Socket`. + assert.strictEqual(ws.readyState, WebSocket.CLOSING); + assert.strictEqual( + err.message, + 'WebSocket is not open: readyState 2 (CLOSING)' + ); + } + + if (--count === 0) done(); + }); + }); + + wss.close(); + }); + }); + describe('#send', function () { it('ignores the `compress` option if the extension is disabled', function (done) { const wss = new WebSocket.Server({ port: 0 }, () => {