diff --git a/lib/sender.js b/lib/sender.js index 500bd98..12ee25a 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -27,20 +27,6 @@ class Sender { this._bufferedBytes = 0; this._deflating = false; this._queue = []; - - if (this._extensions[PerMessageDeflate.extensionName]) { - this._socket.once('close', () => { - const err = new Error('WebSocket is not open: readyState 2 (CLOSING)'); - - while (this._queue.length) { - const params = this._queue.shift(); - const cb = params[params.length - 1]; - - this._bufferedBytes -= params[1].length; - if (typeof cb === 'function') cb(err); - } - }); - } } /** @@ -346,9 +332,19 @@ class Sender { this._deflating = true; perMessageDeflate.compress(data, options.fin, (_, buf) => { + this._deflating = false; + + if (!this._socket.readable && !this._socket.writable) { + // + // The socket is closed. Clear the queue and bail out. + // + this._bufferedBytes = 0; + this._queue.length = 0; + return; + } + options.readOnly = false; this.sendFrame(Sender.frame(buf, options), cb); - this._deflating = false; this.dequeue(); }); } diff --git a/lib/websocket.js b/lib/websocket.js index 79cb219..5b8497d 100644 --- a/lib/websocket.js +++ b/lib/websocket.js @@ -123,11 +123,6 @@ class WebSocket extends EventEmitter { maxPayload ); - // - // `Sender` must be instantiated before adding the `socketOnClose` listener. - // This allows the sender queue, when used, to be emptied before - // `socketOnClose` is called. - // this._sender = new Sender(socket, this._extensions); this._receiver = receiver; this._socket = socket; diff --git a/test/sender.test.js b/test/sender.test.js index e7c7dd1..0440752 100644 --- a/test/sender.test.js +++ b/test/sender.test.js @@ -6,13 +6,14 @@ const PerMessageDeflate = require('../lib/permessage-deflate'); const Sender = require('../lib/sender'); class MockSocket { - constructor ({ write, once } = {}) { + constructor ({ write } = {}) { + this.readable = true; + this.writable = true; + if (write) this.write = write; - if (once) this.once = once; } write () {} - once () {} } describe('Sender', function () { @@ -69,41 +70,43 @@ describe('Sender', function () { }); it('does not compress enqueued messages after socket closes', function (done) { - const numMessages = 100; - let numErrors = 0; - const mockSocket = new MockSocket({ - write: (data) => { - // Test that `PerMessageDeflate#compress()` and `Socket#write()` is - // called only once (for the first message that is not queued). - assert.strictEqual(numErrors, numMessages); - done(); - }, - once: (ev, cb) => { - if (ev === 'close') process.nextTick(cb); - } + write: () => done(new Error('Unexpected call to socket.write()')) }); const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); perMessageDeflate.accept([{}]); + const compress = perMessageDeflate.compress; const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate }); - const options = { compress: true, fin: true }; - sender.send('hi', options); + perMessageDeflate.compress = (data, fin, callback) => { + compress.call(perMessageDeflate, data, fin, (_, buf) => { + assert.strictEqual(sender._bufferedBytes, 198); + assert.strictEqual(sender._queue.length, 99); + assert.strictEqual(mockSocket.readable, false); + assert.strictEqual(mockSocket.writable, false); - for (let i = 0; i < numMessages; i++) { - sender.send('hi', options, (err) => { - assert.ok(err instanceof Error); - assert.strictEqual( - err.message, - 'WebSocket is not open: readyState 2 (CLOSING)' - ); - numErrors++; + process.nextTick(() => { + assert.strictEqual(sender._bufferedBytes, 0); + assert.strictEqual(sender._queue.length, 0); + done(); + }); + + callback(_, buf); }); - } + }; + + const options = { compress: true, fin: true }; + + for (let i = 0; i < 100; i++) sender.send('hi', options); + + process.nextTick(() => { + mockSocket.readable = false; + mockSocket.writable = false; + }); }); it('does not compress data for small payloads', function (done) { diff --git a/test/websocket.test.js b/test/websocket.test.js index dc94586..c849ade 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -2041,40 +2041,6 @@ describe('WebSocket', function () { }); }); - it('reports the state as `CLOSING` in callbacks of discarded queued messages', function (done) { - const wss = new WebSocket.Server({ - perMessageDeflate: { threshold: 0 }, - port: 0 - }, () => { - const ws = new WebSocket(`ws://localhost:${wss.address().port}`); - }); - - wss.on('connection', (ws) => { - const map = () => crypto.randomBytes(16); - let count = 100; - - Array.from({ length: count }).map(map).forEach((data, i) => { - ws.send(data, (err) => { - assert.ok(err instanceof Error); - - if (i > 0) { - // The first message is not queued and compression completes after - // the `'close'` event is emitted on the `net.Socket`. - assert.strictEqual(ws.readyState, WebSocket.CLOSING); - assert.strictEqual( - err.message, - 'WebSocket is not open: readyState 2 (CLOSING)' - ); - } - - if (--count === 0) done(); - }); - }); - - wss.close(); - }); - }); - describe('#send', function () { it('ignores the `compress` option if the extension is disabled', function (done) { const wss = new WebSocket.Server({ port: 0 }, () => { @@ -2106,13 +2072,16 @@ describe('WebSocket', function () { }); ws.on('open', () => { - ws.send('hi', (err) => { - assert.ok(err instanceof Error); - wss.close(done); - }); + ws.send('hi', () => done(new Error('Unexpected callback invocation'))); ws.terminate(); }); }); + + wss.on('connection', (ws) => { + ws.on('close', () => { + wss.close(done); + }); + }); }); it('can be used while data is being decompressed', function (done) {