Skip to content

Commit

Permalink
[fix] Do not waste time compressing when socket is closed (#1464)
Browse files Browse the repository at this point in the history
  • Loading branch information
Evertras authored and lpinca committed Nov 6, 2018
1 parent 1ebff19 commit 7d51fb9
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 20 deletions.
14 changes: 14 additions & 0 deletions lib/sender.js
Expand Up @@ -27,6 +27,20 @@ class Sender {
this._bufferedBytes = 0;
this._deflating = false;
this._queue = [];

if (this._extensions[PerMessageDeflate.extensionName]) {
this._socket.once('close', () => {
const err = new Error('WebSocket is not open: readyState 2 (CLOSING)');

while (this._queue.length) {
const params = this._queue.shift();
const cb = params[params.length - 1];

this._bufferedBytes -= params[1].length;
if (typeof cb === 'function') cb(err);
}
});
}
}

/**
Expand Down
5 changes: 5 additions & 0 deletions lib/websocket.js
Expand Up @@ -123,6 +123,11 @@ class WebSocket extends EventEmitter {
maxPayload
);

//
// `Sender` must be instantiated before adding the `socketOnClose` listener.
// This allows the sender queue, when used, to be emptied before
// `socketOnClose` is called.
//
this._sender = new Sender(socket, this._extensions);
this._receiver = receiver;
this._socket = socket;
Expand Down
98 changes: 78 additions & 20 deletions test/sender.test.js
Expand Up @@ -5,6 +5,16 @@ const assert = require('assert');
const PerMessageDeflate = require('../lib/permessage-deflate');
const Sender = require('../lib/sender');

class MockSocket {
constructor ({ write, once } = {}) {
if (write) this.write = write;
if (once) this.once = once;
}

write () {}
once () {}
}

describe('Sender', function () {
describe('.frame', function () {
it('does not mutate the input buffer if data is `readOnly`', function () {
Expand Down Expand Up @@ -38,12 +48,13 @@ describe('Sender', function () {
it('compresses data if compress option is enabled', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
assert.strictEqual(data[0] & 0x40, 0x40);
if (++count === 3) done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -57,14 +68,53 @@ describe('Sender', function () {
sender.send('hi', options);
});

it('does not compress enqueued messages after socket closes', function (done) {
const numMessages = 100;
let numErrors = 0;

const mockSocket = new MockSocket({
write: (data) => {
// Test that `PerMessageDeflate#compress()` and `Socket#write()` is
// called only once (for the first message that is not queued).
assert.strictEqual(numErrors, numMessages);
done();
},
once: (ev, cb) => {
if (ev === 'close') process.nextTick(cb);
}
});

const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
perMessageDeflate.accept([{}]);

const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

const options = { compress: true, fin: true };
sender.send('hi', options);

for (let i = 0; i < numMessages; i++) {
sender.send('hi', options, (err) => {
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'WebSocket is not open: readyState 2 (CLOSING)'
);
numErrors++;
});
}
});

it('does not compress data for small payloads', function (done) {
const perMessageDeflate = new PerMessageDeflate();
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
assert.notStrictEqual(data[0] & 0x40, 0x40);
done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -76,7 +126,7 @@ describe('Sender', function () {
it('compresses all frames in a fragmented message', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 3 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -87,7 +137,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 6);
done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -100,7 +151,7 @@ describe('Sender', function () {
it('compresses no frames in a fragmented message', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 3 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -111,7 +162,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 5);
done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -124,7 +176,7 @@ describe('Sender', function () {
it('compresses empty buffer as first fragment', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -135,7 +187,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 8);
done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -148,7 +201,7 @@ describe('Sender', function () {
it('compresses empty buffer as last fragment', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -159,7 +212,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 3);
done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -172,11 +226,12 @@ describe('Sender', function () {
it('handles many send calls while processing without crashing on flush', function (done) {
let count = 0;
const perMessageDeflate = new PerMessageDeflate();
const sender = new Sender({
const mockSocket = new MockSocket({
write: () => {
if (++count > 1e4) done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -196,14 +251,15 @@ describe('Sender', function () {
it('works with multiple types of data', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
if (++count === 1) return;

assert.ok(data.equals(Buffer.from([0x89, 0x02, 0x68, 0x69])));
if (count === 4) done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -222,14 +278,15 @@ describe('Sender', function () {
it('works with multiple types of data', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
if (++count === 1) return;

assert.ok(data.equals(Buffer.from([0x8a, 0x02, 0x68, 0x69])));
if (count === 4) done();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand All @@ -249,12 +306,13 @@ describe('Sender', function () {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });

let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data, cb) => {
count++;
if (cb) cb();
}
}, {
});
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

Expand Down
34 changes: 34 additions & 0 deletions test/websocket.test.js
Expand Up @@ -2041,6 +2041,40 @@ describe('WebSocket', function () {
});
});

it('reports the state as `CLOSING` in callbacks of discarded queued messages', function (done) {
const wss = new WebSocket.Server({
perMessageDeflate: { threshold: 0 },
port: 0
}, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
});

wss.on('connection', (ws) => {
const map = () => crypto.randomBytes(16);
let count = 100;

Array.from({ length: count }).map(map).forEach((data, i) => {
ws.send(data, (err) => {
assert.ok(err instanceof Error);

if (i > 0) {
// The first message is not queued and compression completes after
// the `'close'` event is emitted on the `net.Socket`.
assert.strictEqual(ws.readyState, WebSocket.CLOSING);
assert.strictEqual(
err.message,
'WebSocket is not open: readyState 2 (CLOSING)'
);
}

if (--count === 0) done();
});
});

wss.close();
});
});

describe('#send', function () {
it('ignores the `compress` option if the extension is disabled', function (done) {
const wss = new WebSocket.Server({ port: 0 }, () => {
Expand Down

0 comments on commit 7d51fb9

Please sign in to comment.