Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not waste time compressing when socket is closed #1464

Merged
merged 13 commits into from Nov 6, 2018
18 changes: 18 additions & 0 deletions lib/sender.js
Expand Up @@ -27,6 +27,24 @@ class Sender {
this._bufferedBytes = 0;
this._deflating = false;
this._queue = [];

this._socket.prependOnceListener('close', () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need prependOnceListener()? Or is it used only to ensure that the listener is added before the one added in websocket.js? In the latter case I would just add a comment in WebSocket#setSocket() to specify that the Sender must be instantiated before adding the 'close' listener to the net.Socket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out we do actually need it. I was testing a different ready state in my initial pass.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm ok, that's strange as this is added before the other anyway. I'm not a fan of prependOnceListener() because it will add the listener before all other listeners including the ones added by Node.js itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me poke it again, I may end up blaming jet lag. :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, going to blame jet lag. Sorry for the confusion. We do not need prepend, I'll switch it over to the comment instead. The issue was I was switching prepend to once in the Sender but not in the MockSocket for the test.

const err = new Error(
`WebSocket is not open: readyState ${this._socket.readyState} `
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit misleading because readyState here is the ready state of the net.Socket and not the ready state of the WebSocket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Would it be better to have a simple generic message here? The original intent was to provide consistency with what the user would get as an error regardless of whether their send attempt was queued or not, but since it was already inconsistent a generic message wouldn't break anything here.

Copy link
Member

@lpinca lpinca Oct 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can pass the Sender a reference to the WebSocket so we can:

  1. Set the ready state to CLOSING
  2. Use it to make the error message consistent.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been trying to avoid that because I don't like how it hard links Sender to WebSocket. A Sender doesn't need a WebSocket to function.

Copy link
Member

@lpinca lpinca Oct 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, the same is also true for the Receiver but we add a WebSocket reference to it in order to use it in the event listeners.

);

while (this._queue.length) {
const params = this._queue.shift();

this._bufferedBytes -= params[1].length;

const cb = params[params.length - 1];

if (typeof cb === 'function') {
cb(err);
}
}
});
}

/**
Expand Down
111 changes: 81 additions & 30 deletions test/sender.test.js
Expand Up @@ -5,6 +5,20 @@ const assert = require('assert');
const PerMessageDeflate = require('../lib/permessage-deflate');
const Sender = require('../lib/sender');

class MockSocket {
constructor ({ write, on, once, prependOnceListener } = {}) {
if (write) this.write = write;
if (on) this.on = on;
if (once) this.once = once;
if (prependOnceListener) this.prependOnceListener = prependOnceListener;
}

write () {}
on () {}
once () {}
prependOnceListener () {}
}

describe('Sender', function () {
describe('.frame', function () {
it('does not mutate the input buffer if data is `readOnly`', function () {
Expand Down Expand Up @@ -38,14 +52,13 @@ describe('Sender', function () {
it('compresses data if compress option is enabled', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
assert.strictEqual(data[0] & 0x40, 0x40);
if (++count === 3) done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -57,16 +70,62 @@ describe('Sender', function () {
sender.send('hi', options);
});

it('does not attempt to compress enqueued messages after socket closes', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
const numMessages = 1000;
let numWritten = 0;
const mockSocket = new MockSocket({
write: (data) => {
assert.strictEqual(data[0] & 0x40, 0x40);
if (++numWritten > 1) done(new Error('Too many attempted writes'));
},
prependOnceListener: (ev, cb) => {
if (ev === 'close') {
process.nextTick(cb);
}
}
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

let numCompressed = 0;

perMessageDeflate.compress = (data, fin, cb) => {
if (++numCompressed > 1) {
done(new Error('Compressed too many times'));
}

setTimeout(() => cb(null, data), 1);
};

perMessageDeflate.accept([{}]);

const options = { compress: true, fin: false };
const array = new Uint8Array([0x68, 0x69]);

sender.send(array.buffer, options, () => {});
sender.send(array, options, () => {});

let numErrors = 0;
for (let i = 0; i < numMessages; ++i) {
sender.send('hi', options, (err) => {
if (!err) return;

if (++numErrors === numMessages) {
done();
}
});
}
});

it('does not compress data for small payloads', function (done) {
const perMessageDeflate = new PerMessageDeflate();
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
assert.notStrictEqual(data[0] & 0x40, 0x40);
done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -76,7 +135,7 @@ describe('Sender', function () {
it('compresses all frames in a fragmented message', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 3 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -87,9 +146,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 6);
done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -100,7 +158,7 @@ describe('Sender', function () {
it('compresses no frames in a fragmented message', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 3 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -111,9 +169,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 5);
done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -124,7 +181,7 @@ describe('Sender', function () {
it('compresses empty buffer as first fragment', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -135,9 +192,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 8);
done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -148,7 +204,7 @@ describe('Sender', function () {
it('compresses empty buffer as last fragment', function (done) {
const fragments = [];
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
fragments.push(data);
if (fragments.length !== 2) return;
Expand All @@ -159,9 +215,8 @@ describe('Sender', function () {
assert.strictEqual(fragments[1].length, 3);
done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -172,13 +227,12 @@ describe('Sender', function () {
it('handles many send calls while processing without crashing on flush', function (done) {
let count = 0;
const perMessageDeflate = new PerMessageDeflate();
const sender = new Sender({
const mockSocket = new MockSocket({
write: () => {
if (++count > 1e4) done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -196,16 +250,15 @@ describe('Sender', function () {
it('works with multiple types of data', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
if (++count === 1) return;

assert.ok(data.equals(Buffer.from([0x89, 0x02, 0x68, 0x69])));
if (count === 4) done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -222,16 +275,15 @@ describe('Sender', function () {
it('works with multiple types of data', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data) => {
if (++count === 1) return;

assert.ok(data.equals(Buffer.from([0x8a, 0x02, 0x68, 0x69])));
if (count === 4) done();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand All @@ -249,14 +301,13 @@ describe('Sender', function () {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });

let count = 0;
const sender = new Sender({
const mockSocket = new MockSocket({
write: (data, cb) => {
count++;
if (cb) cb();
}
}, {
'permessage-deflate': perMessageDeflate
});
const sender = new Sender(mockSocket, { 'permessage-deflate': perMessageDeflate });

perMessageDeflate.accept([{}]);

Expand Down
32 changes: 32 additions & 0 deletions test/websocket.test.js
Expand Up @@ -2040,6 +2040,38 @@ describe('WebSocket', function () {
});
});

it('reports the web socket as CLOSING in error callbacks when connection is terminated abnormally', function (done) {
console.log('================================================= START');
const wss = new WebSocket.Server({
perMessageDeflate: { threshold: 0 },
port: 0
}, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`, {
perMessageDeflate: { threshold: 0 } });
const messages = [];

ws.on('message', (message) => messages.push(message));
ws.on('close', (code) => {
console.log('closing ws');
assert.strictEqual(code, 1006);
assert.deepStrictEqual(messages, []);
done();
});
});

wss.on('connection', (ws) => {
let checkState = () => {
if (ws._sender._queue.length) {
assert.strictEqual(ws.readyState, WebSocket.CLOSING);
}
};
for (let i = 0; i < 1000; ++i) {
ws.send('foo', { compress: true }, checkState);
}
wss.close();
});
});

describe('#send', function () {
it('ignores the `compress` option if the extension is disabled', function (done) {
const wss = new WebSocket.Server({ port: 0 }, () => {
Expand Down