Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore callbacks when clearing the send queue #1471

Merged
merged 1 commit into from Nov 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 11 additions & 15 deletions lib/sender.js
Expand Up @@ -27,20 +27,6 @@ class Sender {
this._bufferedBytes = 0;
this._deflating = false;
this._queue = [];

if (this._extensions[PerMessageDeflate.extensionName]) {
this._socket.once('close', () => {
const err = new Error('WebSocket is not open: readyState 2 (CLOSING)');

while (this._queue.length) {
const params = this._queue.shift();
const cb = params[params.length - 1];

this._bufferedBytes -= params[1].length;
if (typeof cb === 'function') cb(err);
}
});
}
}

/**
Expand Down Expand Up @@ -346,9 +332,19 @@ class Sender {

this._deflating = true;
perMessageDeflate.compress(data, options.fin, (_, buf) => {
this._deflating = false;

if (!this._socket.readable && !this._socket.writable) {
//
// The socket is closed. Clear the queue and bail out.
//
this._bufferedBytes = 0;
this._queue.length = 0;
return;
}

options.readOnly = false;
this.sendFrame(Sender.frame(buf, options), cb);
this._deflating = false;
this.dequeue();
});
}
Expand Down
5 changes: 0 additions & 5 deletions lib/websocket.js
Expand Up @@ -123,11 +123,6 @@ class WebSocket extends EventEmitter {
maxPayload
);

//
// `Sender` must be instantiated before adding the `socketOnClose` listener.
// This allows the sender queue, when used, to be emptied before
// `socketOnClose` is called.
//
this._sender = new Sender(socket, this._extensions);
this._receiver = receiver;
this._socket = socket;
Expand Down
55 changes: 29 additions & 26 deletions test/sender.test.js
Expand Up @@ -6,13 +6,14 @@ const PerMessageDeflate = require('../lib/permessage-deflate');
const Sender = require('../lib/sender');

class MockSocket {
constructor ({ write, once } = {}) {
constructor ({ write } = {}) {
this.readable = true;
this.writable = true;

if (write) this.write = write;
if (once) this.once = once;
}

write () {}
once () {}
}

describe('Sender', function () {
Expand Down Expand Up @@ -69,41 +70,43 @@ describe('Sender', function () {
});

it('does not compress enqueued messages after socket closes', function (done) {
const numMessages = 100;
let numErrors = 0;

const mockSocket = new MockSocket({
write: (data) => {
// Test that `PerMessageDeflate#compress()` and `Socket#write()` is
// called only once (for the first message that is not queued).
assert.strictEqual(numErrors, numMessages);
done();
},
once: (ev, cb) => {
if (ev === 'close') process.nextTick(cb);
}
write: () => done(new Error('Unexpected call to socket.write()'))
});

const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
perMessageDeflate.accept([{}]);

const compress = perMessageDeflate.compress;
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

const options = { compress: true, fin: true };
sender.send('hi', options);
perMessageDeflate.compress = (data, fin, callback) => {
compress.call(perMessageDeflate, data, fin, (_, buf) => {
assert.strictEqual(sender._bufferedBytes, 198);
assert.strictEqual(sender._queue.length, 99);
assert.strictEqual(mockSocket.readable, false);
assert.strictEqual(mockSocket.writable, false);

for (let i = 0; i < numMessages; i++) {
sender.send('hi', options, (err) => {
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'WebSocket is not open: readyState 2 (CLOSING)'
);
numErrors++;
process.nextTick(() => {
assert.strictEqual(sender._bufferedBytes, 0);
assert.strictEqual(sender._queue.length, 0);
done();
});

callback(_, buf);
});
}
};

const options = { compress: true, fin: true };

for (let i = 0; i < 100; i++) sender.send('hi', options);

process.nextTick(() => {
mockSocket.readable = false;
mockSocket.writable = false;
});
});

it('does not compress data for small payloads', function (done) {
Expand Down
45 changes: 7 additions & 38 deletions test/websocket.test.js
Expand Up @@ -2041,40 +2041,6 @@ describe('WebSocket', function () {
});
});

it('reports the state as `CLOSING` in callbacks of discarded queued messages', function (done) {
const wss = new WebSocket.Server({
perMessageDeflate: { threshold: 0 },
port: 0
}, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
});

wss.on('connection', (ws) => {
const map = () => crypto.randomBytes(16);
let count = 100;

Array.from({ length: count }).map(map).forEach((data, i) => {
ws.send(data, (err) => {
assert.ok(err instanceof Error);

if (i > 0) {
// The first message is not queued and compression completes after
// the `'close'` event is emitted on the `net.Socket`.
assert.strictEqual(ws.readyState, WebSocket.CLOSING);
assert.strictEqual(
err.message,
'WebSocket is not open: readyState 2 (CLOSING)'
);
}

if (--count === 0) done();
});
});

wss.close();
});
});

describe('#send', function () {
it('ignores the `compress` option if the extension is disabled', function (done) {
const wss = new WebSocket.Server({ port: 0 }, () => {
Expand Down Expand Up @@ -2106,13 +2072,16 @@ describe('WebSocket', function () {
});

ws.on('open', () => {
ws.send('hi', (err) => {
assert.ok(err instanceof Error);
wss.close(done);
});
ws.send('hi', () => done(new Error('Unexpected callback invocation')));
ws.terminate();
});
});

wss.on('connection', (ws) => {
ws.on('close', () => {
wss.close(done);
});
});
});

it('can be used while data is being decompressed', function (done) {
Expand Down