Skip to content

Commit

Permalink
[minor] Ignore callbacks when clearing the send queue (#1471)
Browse files Browse the repository at this point in the history
Do not invoke callbacks when clearing the send queue due to premature
socket closure.

Refs: #1464 (comment)
Fixes #1226
  • Loading branch information
lpinca committed Nov 14, 2018
1 parent 5914206 commit f26fac8
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 84 deletions.
26 changes: 11 additions & 15 deletions lib/sender.js
Expand Up @@ -27,20 +27,6 @@ class Sender {
this._bufferedBytes = 0;
this._deflating = false;
this._queue = [];

if (this._extensions[PerMessageDeflate.extensionName]) {
this._socket.once('close', () => {
const err = new Error('WebSocket is not open: readyState 2 (CLOSING)');

while (this._queue.length) {
const params = this._queue.shift();
const cb = params[params.length - 1];

this._bufferedBytes -= params[1].length;
if (typeof cb === 'function') cb(err);
}
});
}
}

/**
Expand Down Expand Up @@ -346,9 +332,19 @@ class Sender {

this._deflating = true;
perMessageDeflate.compress(data, options.fin, (_, buf) => {
this._deflating = false;

if (!this._socket.readable && !this._socket.writable) {
//
// The socket is closed. Clear the queue and bail out.
//
this._bufferedBytes = 0;
this._queue.length = 0;
return;
}

options.readOnly = false;
this.sendFrame(Sender.frame(buf, options), cb);
this._deflating = false;
this.dequeue();
});
}
Expand Down
5 changes: 0 additions & 5 deletions lib/websocket.js
Expand Up @@ -123,11 +123,6 @@ class WebSocket extends EventEmitter {
maxPayload
);

//
// `Sender` must be instantiated before adding the `socketOnClose` listener.
// This allows the sender queue, when used, to be emptied before
// `socketOnClose` is called.
//
this._sender = new Sender(socket, this._extensions);
this._receiver = receiver;
this._socket = socket;
Expand Down
55 changes: 29 additions & 26 deletions test/sender.test.js
Expand Up @@ -6,13 +6,14 @@ const PerMessageDeflate = require('../lib/permessage-deflate');
const Sender = require('../lib/sender');

class MockSocket {
constructor ({ write, once } = {}) {
constructor ({ write } = {}) {
this.readable = true;
this.writable = true;

if (write) this.write = write;
if (once) this.once = once;
}

write () {}
once () {}
}

describe('Sender', function () {
Expand Down Expand Up @@ -69,41 +70,43 @@ describe('Sender', function () {
});

it('does not compress enqueued messages after socket closes', function (done) {
const numMessages = 100;
let numErrors = 0;

const mockSocket = new MockSocket({
write: (data) => {
// Test that `PerMessageDeflate#compress()` and `Socket#write()` is
// called only once (for the first message that is not queued).
assert.strictEqual(numErrors, numMessages);
done();
},
once: (ev, cb) => {
if (ev === 'close') process.nextTick(cb);
}
write: () => done(new Error('Unexpected call to socket.write()'))
});

const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
perMessageDeflate.accept([{}]);

const compress = perMessageDeflate.compress;
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

const options = { compress: true, fin: true };
sender.send('hi', options);
perMessageDeflate.compress = (data, fin, callback) => {
compress.call(perMessageDeflate, data, fin, (_, buf) => {
assert.strictEqual(sender._bufferedBytes, 198);
assert.strictEqual(sender._queue.length, 99);
assert.strictEqual(mockSocket.readable, false);
assert.strictEqual(mockSocket.writable, false);

for (let i = 0; i < numMessages; i++) {
sender.send('hi', options, (err) => {
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'WebSocket is not open: readyState 2 (CLOSING)'
);
numErrors++;
process.nextTick(() => {
assert.strictEqual(sender._bufferedBytes, 0);
assert.strictEqual(sender._queue.length, 0);
done();
});

callback(_, buf);
});
}
};

const options = { compress: true, fin: true };

for (let i = 0; i < 100; i++) sender.send('hi', options);

process.nextTick(() => {
mockSocket.readable = false;
mockSocket.writable = false;
});
});

it('does not compress data for small payloads', function (done) {
Expand Down
45 changes: 7 additions & 38 deletions test/websocket.test.js
Expand Up @@ -2041,40 +2041,6 @@ describe('WebSocket', function () {
});
});

it('reports the state as `CLOSING` in callbacks of discarded queued messages', function (done) {
const wss = new WebSocket.Server({
perMessageDeflate: { threshold: 0 },
port: 0
}, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
});

wss.on('connection', (ws) => {
const map = () => crypto.randomBytes(16);
let count = 100;

Array.from({ length: count }).map(map).forEach((data, i) => {
ws.send(data, (err) => {
assert.ok(err instanceof Error);

if (i > 0) {
// The first message is not queued and compression completes after
// the `'close'` event is emitted on the `net.Socket`.
assert.strictEqual(ws.readyState, WebSocket.CLOSING);
assert.strictEqual(
err.message,
'WebSocket is not open: readyState 2 (CLOSING)'
);
}

if (--count === 0) done();
});
});

wss.close();
});
});

describe('#send', function () {
it('ignores the `compress` option if the extension is disabled', function (done) {
const wss = new WebSocket.Server({ port: 0 }, () => {
Expand Down Expand Up @@ -2106,13 +2072,16 @@ describe('WebSocket', function () {
});

ws.on('open', () => {
ws.send('hi', (err) => {
assert.ok(err instanceof Error);
wss.close(done);
});
ws.send('hi', () => done(new Error('Unexpected callback invocation')));
ws.terminate();
});
});

wss.on('connection', (ws) => {
ws.on('close', () => {
wss.close(done);
});
});
});

it('can be used while data is being decompressed', function (done) {
Expand Down

0 comments on commit f26fac8

Please sign in to comment.