Skip to content

Commit

Permalink
[feature] Add utility to wrap a WebSocket in a Duplex stream
Browse files Browse the repository at this point in the history
Fixes #113
  • Loading branch information
lpinca committed Jun 17, 2019
1 parent 38d3bf2 commit 7ff212b
Show file tree
Hide file tree
Showing 5 changed files with 598 additions and 4 deletions.
19 changes: 18 additions & 1 deletion README.md
Expand Up @@ -34,6 +34,7 @@ can use one of the many wrappers available on npm, like
- [Multiple servers sharing a single HTTP/S server](#multiple-servers-sharing-a-single-https-server)
- [Server broadcast](#server-broadcast)
- [echo.websocket.org demo](#echowebsocketorg-demo)
- [Use the Node.js streams API](#use-the-nodejs-streams-api)
- [Other examples](#other-examples)
- [FAQ](#faq)
- [How to get the IP address of the client?](#how-to-get-the-ip-address-of-the-client)
Expand Down Expand Up @@ -69,7 +70,8 @@ necessarily need to have a C++ compiler installed on your machine.

## API docs

See [`/doc/ws.md`](./doc/ws.md) for Node.js-like docs for the ws classes.
See [`/doc/ws.md`](./doc/ws.md) for Node.js-like documentation of ws classes and
utility functions.

## WebSocket compression

Expand Down Expand Up @@ -302,6 +304,21 @@ ws.on('message', function incoming(data) {
});
```

### Use the Node.js streams API

```js
const WebSocket = require('ws');

const ws = new WebSocket('wss://echo.websocket.org/', {
origin: 'https://websocket.org'
});

const duplex = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' });

duplex.pipe(process.stdout);
process.stdin.pipe(duplex);
```

### Other examples

For a full example with a browser client communicating with a ws server, see the
Expand Down
18 changes: 15 additions & 3 deletions doc/ws.md
Expand Up @@ -43,6 +43,7 @@
- [websocket.send(data[, options][, callback])](#websocketsenddata-options-callback)
- [websocket.terminate()](#websocketterminate)
- [websocket.url](#websocketurl)
- [WebSocket.createWebSocketStream(websocket[, options])](#websocketcreatewebsocketstreamwebsocket-options)

## Class: WebSocket.Server

Expand Down Expand Up @@ -463,11 +464,22 @@ Forcibly close the connection.

The URL of the WebSocket server. Server clients don't have this attribute.

## WebSocket.createWebSocketStream(websocket[, options])

- `websocket` {WebSocket} A `WebSocket` object.
- `options` {Object} [Options][duplex-options] to pass to the `Duplex`
constructor.

Returns a `Duplex` stream that allows to use the Node.js streams API on top of a
given `WebSocket`.

[concurrency-limit]: https://github.com/websockets/ws/issues/1202
[permessage-deflate]:
https://tools.ietf.org/html/draft-ietf-hybi-permessage-compression-19
[zlib-options]: https://nodejs.org/api/zlib.html#zlib_class_options
[duplex-options]:
https://nodejs.org/api/stream.html#stream_new_stream_duplex_options
[http.request()]:
https://nodejs.org/api/http.html#http_http_request_options_callback
[https.request()]:
https://nodejs.org/api/https.html#https_https_request_options_callback
[permessage-deflate]:
https://tools.ietf.org/html/draft-ietf-hybi-permessage-compression-19
[zlib-options]: https://nodejs.org/api/zlib.html#zlib_class_options
1 change: 1 addition & 0 deletions index.js
Expand Up @@ -2,6 +2,7 @@

const WebSocket = require('./lib/websocket');

WebSocket.createWebSocketStream = require('./lib/stream');
WebSocket.Server = require('./lib/websocket-server');
WebSocket.Receiver = require('./lib/receiver');
WebSocket.Sender = require('./lib/sender');
Expand Down
150 changes: 150 additions & 0 deletions lib/stream.js
@@ -0,0 +1,150 @@
'use strict';

const { Duplex } = require('stream');

/**
* Emits the `'close'` event on a stream.
*
* @param {stream.Duplex} The stream.
* @private
*/
function emitClose(stream) {
stream.emit('close');
}

/**
* The listener of the `'end'` event.
*
* @private
*/
function duplexOnEnd() {
if (!this.destroyed && this._writableState.finished) {
this.destroy();
}
}

/**
* The listener of the `'error'` event.
*
* @private
*/
function duplexOnError(err) {
this.removeListener('error', duplexOnError);
this.destroy();
if (this.listenerCount('error') === 0) {
// Do not suppress the throwing behavior.
this.emit('error', err);
}
}

/**
* Wraps a `WebSocket` in a duplex stream.
*
* @param {WebSocket} ws The `WebSocket` to wrap
* @param {Object} options The options for the `Duplex` constructor
* @return {stream.Duplex} The duplex stream
* @public
*/
function createWebSocketStream(ws, options) {
let resumeOnReceiverDrain = true;

function receiverOnDrain() {
if (resumeOnReceiverDrain) ws._socket.resume();
}

if (ws.readyState === ws.CONNECTING) {
ws.once('open', function open() {
ws._receiver.removeAllListeners('drain');
ws._receiver.on('drain', receiverOnDrain);
});
} else {
ws._receiver.removeAllListeners('drain');
ws._receiver.on('drain', receiverOnDrain);
}

const duplex = new Duplex({
...options,
autoDestroy: false,
emitClose: false,
objectMode: false,
readableObjectMode: false,
writableObjectMode: false
});

ws.on('message', function message(msg) {
if (!duplex.push(msg)) {
resumeOnReceiverDrain = false;
ws._socket.pause();
}
});

ws.once('error', function error(err) {
duplex.destroy(err);
});

ws.once('close', function close() {
if (duplex.destroyed) return;

duplex.push(null);
});

duplex._destroy = function(err, callback) {
if (ws.readyState === ws.CLOSED) {
callback(err);
process.nextTick(emitClose, duplex);
return;
}

ws.once('close', function close() {
callback(err);
process.nextTick(emitClose, duplex);
});
ws.terminate();
};

duplex._final = function(callback) {
if (ws.readyState === ws.CONNECTING) {
ws.once('open', function open() {
duplex._final(callback);
});
return;
}

if (ws._socket._writableState.finished) {
if (duplex._readableState.endEmitted) duplex.destroy();
callback();
} else {
ws._socket.once('finish', function finish() {
// `duplex` is not destroyed here because the `'end'` event will be
// emitted on `duplex` after this `'finish'` event. The EOF signaling
// `null` chunk is, in fact, pushed when the WebSocket emits `'close'`.
callback();
});
ws.close();
}
};

duplex._read = function() {
if (ws.readyState === ws.OPEN && !resumeOnReceiverDrain) {
resumeOnReceiverDrain = true;
if (!ws._receiver._writableState.needDrain) ws._socket.resume();
}
};

duplex._write = function(chunk, encoding, callback) {
if (ws.readyState === ws.CONNECTING) {
ws.once('open', function open() {
duplex._write(chunk, encoding, callback);
});
return;
}

ws.send(chunk, callback);
};

duplex.on('end', duplexOnEnd);
duplex.on('error', duplexOnError);
return duplex;
}

module.exports = createWebSocketStream;

0 comments on commit 7ff212b

Please sign in to comment.