Skip to content

Commit

Permalink
[perf] Reduce buffer allocations
Browse files Browse the repository at this point in the history
Do not convert strings to `Buffer`s if data does not need to be masked.

Refs: #1998
  • Loading branch information
lpinca committed Jan 6, 2022
1 parent 4081a36 commit b1a6e81
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 46 deletions.
4 changes: 2 additions & 2 deletions lib/permessage-deflate.js
Expand Up @@ -313,7 +313,7 @@ class PerMessageDeflate {
/**
* Compress data. Concurrency limited.
*
* @param {Buffer} data Data to compress
* @param {(Buffer|String)} data Data to compress
* @param {Boolean} fin Specifies whether or not this is the last fragment
* @param {Function} callback Callback
* @public
Expand Down Expand Up @@ -395,7 +395,7 @@ class PerMessageDeflate {
/**
* Compress data.
*
* @param {Buffer} data Data to compress
* @param {(Buffer|String)} data Data to compress
* @param {Boolean} fin Specifies whether or not this is the last fragment
* @param {Function} callback Callback
* @private
Expand Down
140 changes: 103 additions & 37 deletions lib/sender.js
Expand Up @@ -46,8 +46,10 @@ class Sender {
/**
* Frames a piece of data according to the HyBi WebSocket protocol.
*
* @param {Buffer} data The data to frame
* @param {(Buffer|String)} data The data to frame
* @param {Object} options Options object
* @param {Number} [options.byteLength] The length in bytes of the data. Only
* relevant if data is a string.
* @param {Boolean} [options.fin=false] Specifies whether or not to set the
* FIN bit
* @param {Function} [options.generateMask] The function used to generate the
Expand All @@ -58,7 +60,7 @@ class Sender {
* key
* @param {Number} options.opcode The opcode
* @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
* modified
* modified. relevant if data is a `Buffer`.
* @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
* RSV1 bit
* @return {Buffer[]} The framed data as a list of `Buffer` instances
Expand All @@ -80,12 +82,27 @@ class Sender {
}

skipMasking = (mask[0] | mask[1] | mask[2] | mask[3]) === 0;
if (options.readOnly && !skipMasking) merge = true;

offset = 6;
}

let payloadLength = data.length;
let dataLength;

if (typeof data === 'string') {
if (!options.mask || skipMasking) {
dataLength =
options.byteLength !== undefined
? options.byteLength
: Buffer.byteLength(data);
} else {
data = Buffer.from(data);
dataLength = data.length;
}
} else {
dataLength = data.length;
merge = options.mask && options.readOnly && !skipMasking;
}

let payloadLength = dataLength;

if (data.length >= 65536) {
offset += 8;
Expand All @@ -95,18 +112,18 @@ class Sender {
payloadLength = 126;
}

const target = Buffer.allocUnsafe(merge ? data.length + offset : offset);
const target = Buffer.allocUnsafe(merge ? dataLength + offset : offset);

target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
if (options.rsv1) target[0] |= 0x40;

target[1] = payloadLength;

if (payloadLength === 126) {
target.writeUInt16BE(data.length, 2);
target.writeUInt16BE(dataLength, 2);
} else if (payloadLength === 127) {
target[2] = target[3] = 0;
target.writeUIntBE(data.length, 4, 6);
target.writeUIntBE(dataLength, 4, 6);
}

if (!options.mask) return [target, data];
Expand Down Expand Up @@ -165,23 +182,25 @@ class Sender {
}

if (this._deflating) {
this.enqueue([this.doClose, buf, mask, cb]);
this.enqueue([this.doClose, buf, buf.length, mask, cb]);
} else {
this.doClose(buf, mask, cb);
this.doClose(buf, buf.length, mask, cb);
}
}

/**
* Frames and sends a close message.
*
* @param {Buffer} data The message to send
* @param {Number} byteLength The length in bytes of the message
* @param {Boolean} [mask=false] Specifies whether or not to mask `data`
* @param {Function} [cb] Callback
* @private
*/
doClose(data, mask, cb) {
doClose(data, byteLength, mask, cb) {
this.sendFrame(
Sender.frame(data, {
byteLength,
fin: true,
rsv1: false,
opcode: 0x08,
Expand All @@ -203,31 +222,43 @@ class Sender {
* @public
*/
ping(data, mask, cb) {
const buf = toBuffer(data);
let byteLength;
let readOnly;

if (typeof data === 'string') {
byteLength = Buffer.byteLength(data);
readOnly = false;
} else {
data = toBuffer(data);
byteLength = data.length;
readOnly = toBuffer.readOnly;
}

if (buf.length > 125) {
if (byteLength > 125) {
throw new RangeError('The data size must not be greater than 125 bytes');
}

if (this._deflating) {
this.enqueue([this.doPing, buf, mask, toBuffer.readOnly, cb]);
this.enqueue([this.doPing, data, byteLength, mask, readOnly, cb]);
} else {
this.doPing(buf, mask, toBuffer.readOnly, cb);
this.doPing(data, byteLength, mask, readOnly, cb);
}
}

/**
* Frames and sends a ping message.
*
* @param {Buffer} data The message to send
* @param {(Buffer|String)} data The message to send
* @param {Number} byteLength The length in bytes of the message
* @param {Boolean} [mask=false] Specifies whether or not to mask `data`
* @param {Boolean} [readOnly=false] Specifies whether `data` can be modified
* @param {Function} [cb] Callback
* @private
*/
doPing(data, mask, readOnly, cb) {
doPing(data, byteLength, mask, readOnly, cb) {
this.sendFrame(
Sender.frame(data, {
byteLength,
fin: true,
rsv1: false,
opcode: 0x09,
Expand All @@ -249,31 +280,43 @@ class Sender {
* @public
*/
pong(data, mask, cb) {
const buf = toBuffer(data);
let byteLength;
let readOnly;

if (typeof data === 'string') {
byteLength = Buffer.byteLength(data);
readOnly = false;
} else {
data = toBuffer(data);
byteLength = data.length;
readOnly = toBuffer.readOnly;
}

if (buf.length > 125) {
if (byteLength > 125) {
throw new RangeError('The data size must not be greater than 125 bytes');
}

if (this._deflating) {
this.enqueue([this.doPong, buf, mask, toBuffer.readOnly, cb]);
this.enqueue([this.doPong, data, byteLength, mask, readOnly, cb]);
} else {
this.doPong(buf, mask, toBuffer.readOnly, cb);
this.doPong(data, byteLength, mask, readOnly, cb);
}
}

/**
* Frames and sends a pong message.
*
* @param {Buffer} data The message to send
* @param {(Buffer|String)} data The message to send
* @param {Number} byteLength The length in bytes of the message
* @param {Boolean} [mask=false] Specifies whether or not to mask `data`
* @param {Boolean} [readOnly=false] Specifies whether `data` can be modified
* @param {Function} [cb] Callback
* @private
*/
doPong(data, mask, readOnly, cb) {
doPong(data, byteLength, mask, readOnly, cb) {
this.sendFrame(
Sender.frame(data, {
byteLength,
fin: true,
rsv1: false,
opcode: 0x0a,
Expand Down Expand Up @@ -303,11 +346,22 @@ class Sender {
* @public
*/
send(data, options, cb) {
const buf = toBuffer(data);
const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
let opcode = options.binary ? 2 : 1;
let rsv1 = options.compress;

let byteLength;
let readOnly;

if (typeof data === 'string') {
byteLength = Buffer.byteLength(data);
readOnly = false;
} else {
data = toBuffer(data);
byteLength = data.length;
readOnly = toBuffer.readOnly;
}

if (this._firstFragment) {
this._firstFragment = false;
if (
Expand All @@ -319,7 +373,7 @@ class Sender {
: 'client_no_context_takeover'
]
) {
rsv1 = buf.length >= perMessageDeflate._threshold;
rsv1 = byteLength >= perMessageDeflate._threshold;
}
this._compress = rsv1;
} else {
Expand All @@ -331,30 +385,39 @@ class Sender {

if (perMessageDeflate) {
const opts = {
byteLength,
fin: options.fin,
rsv1,
opcode,
mask: options.mask,
maskBuffer: this._maskBuffer,
generateMask: this._generateMask,
readOnly: toBuffer.readOnly
readOnly
};

if (this._deflating) {
this.enqueue([this.dispatch, buf, this._compress, opts, cb]);
this.enqueue([
this.dispatch,
data,
byteLength,
this._compress,
opts,
cb
]);
} else {
this.dispatch(buf, this._compress, opts, cb);
this.dispatch(data, byteLength, this._compress, opts, cb);
}
} else {
this.sendFrame(
Sender.frame(buf, {
Sender.frame(data, {
byteLength,
fin: options.fin,
rsv1: false,
opcode,
mask: options.mask,
maskBuffer: this._maskBuffer,
generateMask: this._generateMask,
readOnly: toBuffer.readOnly
readOnly
}),
cb
);
Expand All @@ -364,11 +427,13 @@ class Sender {
/**
* Dispatches a data message.
*
* @param {Buffer} data The message to send
* @param {(Buffer|String)} data The message to send
* @param {Number} byteLength The length in bytes of the message
* @param {Boolean} [compress=false] Specifies whether or not to compress
* `data`
* @param {Object} options Options object
* @param {Number} options.opcode The opcode
* @param {Number} [options.byteLength] The length in bytes of the message
* @param {Boolean} [options.fin=false] Specifies whether or not to set the
* FIN bit
* @param {Function} [options.generateMask] The function used to generate the
Expand All @@ -384,15 +449,15 @@ class Sender {
* @param {Function} [cb] Callback
* @private
*/
dispatch(data, compress, options, cb) {
dispatch(data, byteLength, compress, options, cb) {
if (!compress) {
this.sendFrame(Sender.frame(data, options), cb);
return;
}

const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];

this._bufferedBytes += data.length;
this._bufferedBytes += byteLength;
this._deflating = true;
perMessageDeflate.compress(data, options.fin, (_, buf) => {
if (this._socket.destroyed) {
Expand All @@ -403,15 +468,16 @@ class Sender {
if (typeof cb === 'function') cb(err);

for (let i = 0; i < this._queue.length; i++) {
const callback = this._queue[i][4];
const params = this._queue[i];
const callback = params[params.length - 1];

if (typeof callback === 'function') callback(err);
}

return;
}

this._bufferedBytes -= data.length;
this._bufferedBytes -= byteLength;
this._deflating = false;
options.readOnly = false;
this.sendFrame(Sender.frame(buf, options), cb);
Expand All @@ -428,7 +494,7 @@ class Sender {
while (!this._deflating && this._queue.length) {
const params = this._queue.shift();

this._bufferedBytes -= params[1].length;
this._bufferedBytes -= params[2];
Reflect.apply(params[0], this, params.slice(1));
}
}
Expand All @@ -440,7 +506,7 @@ class Sender {
* @private
*/
enqueue(params) {
this._bufferedBytes += params[1].length;
this._bufferedBytes += params[2];
this._queue.push(params);
}

Expand Down

0 comments on commit b1a6e81

Please sign in to comment.