Skip to content

Commit

Permalink
[minor] Ignore callbacks when clearing the send queue
Browse files Browse the repository at this point in the history
Do not invoke callbacks when clearing the send queue due to premature
socket closure.

Refs: #1464 (comment)

Fixes #1226
  • Loading branch information
lpinca committed Nov 10, 2018
1 parent 5914206 commit f0d092c
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 84 deletions.
26 changes: 11 additions & 15 deletions lib/sender.js
Expand Up @@ -27,20 +27,6 @@ class Sender {
this._bufferedBytes = 0;
this._deflating = false;
this._queue = [];

if (this._extensions[PerMessageDeflate.extensionName]) {
this._socket.once('close', () => {
const err = new Error('WebSocket is not open: readyState 2 (CLOSING)');

while (this._queue.length) {
const params = this._queue.shift();
const cb = params[params.length - 1];

this._bufferedBytes -= params[1].length;
if (typeof cb === 'function') cb(err);
}
});
}
}

/**
Expand Down Expand Up @@ -346,9 +332,19 @@ class Sender {

this._deflating = true;
perMessageDeflate.compress(data, options.fin, (_, buf) => {
this._deflating = false;

if (!this._socket.readable && !this._socket.writable) {
//
// The socket is closed. Clear the queue and bail out.
//
this._bufferedBytes = 0;
this._queue.length = 0;
return;
}

options.readOnly = false;
this.sendFrame(Sender.frame(buf, options), cb);
this._deflating = false;
this.dequeue();
});
}
Expand Down
5 changes: 0 additions & 5 deletions lib/websocket.js
Expand Up @@ -123,11 +123,6 @@ class WebSocket extends EventEmitter {
maxPayload
);

//
// `Sender` must be instantiated before adding the `socketOnClose` listener.
// This allows the sender queue, when used, to be emptied before
// `socketOnClose` is called.
//
this._sender = new Sender(socket, this._extensions);
this._receiver = receiver;
this._socket = socket;
Expand Down
55 changes: 29 additions & 26 deletions test/sender.test.js
Expand Up @@ -6,13 +6,14 @@ const PerMessageDeflate = require('../lib/permessage-deflate');
const Sender = require('../lib/sender');

class MockSocket {
constructor ({ write, once } = {}) {
constructor ({ write } = {}) {
this.readable = true;
this.writable = true;

if (write) this.write = write;
if (once) this.once = once;
}

write () {}
once () {}
}

describe('Sender', function () {
Expand Down Expand Up @@ -69,41 +70,43 @@ describe('Sender', function () {
});

it('does not compress enqueued messages after socket closes', function (done) {
const numMessages = 100;
let numErrors = 0;

const mockSocket = new MockSocket({
write: (data) => {
// Test that `PerMessageDeflate#compress()` and `Socket#write()` is
// called only once (for the first message that is not queued).
assert.strictEqual(numErrors, numMessages);
done();
},
once: (ev, cb) => {
if (ev === 'close') process.nextTick(cb);
}
write: () => done(new Error('Unexpected call to socket.write()'))
});

const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
perMessageDeflate.accept([{}]);

const compress = perMessageDeflate.compress;
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

const options = { compress: true, fin: true };
sender.send('hi', options);
perMessageDeflate.compress = (data, fin, callback) => {
compress.call(perMessageDeflate, data, fin, (_, buf) => {
assert.strictEqual(sender._bufferedBytes, 198);
assert.strictEqual(sender._queue.length, 99);
assert.strictEqual(mockSocket.readable, false);
assert.strictEqual(mockSocket.writable, false);

for (let i = 0; i < numMessages; i++) {
sender.send('hi', options, (err) => {
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'WebSocket is not open: readyState 2 (CLOSING)'
);
numErrors++;
process.nextTick(() => {
assert.strictEqual(sender._bufferedBytes, 0);
assert.strictEqual(sender._queue.length, 0);
done();
});

callback(_, buf);
});
}
};

const options = { compress: true, fin: true };

for (let i = 0; i < 100; i++) sender.send('hi', options);

process.nextTick(() => {
mockSocket.readable = false;
mockSocket.writable = false;
});
});

it('does not compress data for small payloads', function (done) {
Expand Down
45 changes: 7 additions & 38 deletions test/websocket.test.js
Expand Up @@ -2041,40 +2041,6 @@ describe('WebSocket', function () {
});
});

it('reports the state as `CLOSING` in callbacks of discarded queued messages', function (done) {
const wss = new WebSocket.Server({
perMessageDeflate: { threshold: 0 },
port: 0
}, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
});

wss.on('connection', (ws) => {
const map = () => crypto.randomBytes(16);
let count = 100;

Array.from({ length: count }).map(map).forEach((data, i) => {
ws.send(data, (err) => {
assert.ok(err instanceof Error);

if (i > 0) {
// The first message is not queued and compression completes after
// the `'close'` event is emitted on the `net.Socket`.
assert.strictEqual(ws.readyState, WebSocket.CLOSING);
assert.strictEqual(
err.message,
'WebSocket is not open: readyState 2 (CLOSING)'
);
}

if (--count === 0) done();
});
});

wss.close();
});
});

describe('#send', function () {
it('ignores the `compress` option if the extension is disabled', function (done) {
const wss = new WebSocket.Server({ port: 0 }, () => {
Expand Down Expand Up @@ -2106,13 +2072,16 @@ describe('WebSocket', function () {
});

ws.on('open', () => {
ws.send('hi', (err) => {
assert.ok(err instanceof Error);
wss.close(done);
});
ws.send('hi', () => done(new Error('Unexpected callback invocation')));
ws.terminate();
});
});

wss.on('connection', (ws) => {
ws.on('close', () => {
wss.close(done);
});
});
});

it('can be used while data is being decompressed', function (done) {
Expand Down

0 comments on commit f0d092c

Please sign in to comment.