Skip to content

Commit

Permalink
refactor websocket control frame handling (#3241)
Browse files Browse the repository at this point in the history
Co-authored-by: tai-kun <taikun.dev@gmail.com>
  • Loading branch information
KhafraDev and tai-kun committed May 11, 2024
1 parent 56b0ba3 commit 9302599
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 117 deletions.
253 changes: 137 additions & 116 deletions lib/web/websocket/receiver.js
@@ -1,10 +1,11 @@
'use strict'

const { Writable } = require('node:stream')
const assert = require('node:assert')
const { parserStates, opcodes, states, emptyBuffer, sentCloseFrameState } = require('./constants')
const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols')
const { channels } = require('../../core/diagnostics')
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode } = require('./util')
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode, isControlFrame } = require('./util')
const { WebsocketFrameSend } = require('./frame')
const { CloseEvent } = require('./events')

Expand Down Expand Up @@ -53,30 +54,39 @@ class ByteParser extends Writable {
}

const buffer = this.consume(2)
const fin = (buffer[0] & 0x80) !== 0
const opcode = buffer[0] & 0x0F
const masked = (buffer[1] & 0x80) === 0x80

this.#info.fin = (buffer[0] & 0x80) !== 0
this.#info.opcode = buffer[0] & 0x0F
this.#info.masked = (buffer[1] & 0x80) === 0x80

if (this.#info.masked) {
if (masked) {
failWebsocketConnection(this.ws, 'Frame cannot be masked')
return callback()
}

// If we receive a fragmented message, we use the type of the first
// frame to parse the full message as binary/text, when it's terminated
this.#info.originalOpcode ??= this.#info.opcode

this.#info.fragmented = !this.#info.fin && this.#info.opcode !== opcodes.CONTINUATION
const fragmented = !fin && opcode !== opcodes.CONTINUATION

if (this.#info.fragmented && this.#info.opcode !== opcodes.BINARY && this.#info.opcode !== opcodes.TEXT) {
if (fragmented && opcode !== opcodes.BINARY && opcode !== opcodes.TEXT) {
// Only text and binary frames can be fragmented
failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.')
return
}

const payloadLength = buffer[1] & 0x7F

if (isControlFrame(opcode)) {
const loop = this.parseControlFrame(callback, {
opcode,
fragmented,
payloadLength
})

if (loop) {
continue
} else {
return
}
}

if (payloadLength <= 125) {
this.#info.payloadLength = payloadLength
this.#state = parserStates.READ_DATA
Expand All @@ -86,114 +96,18 @@ class ByteParser extends Writable {
this.#state = parserStates.PAYLOADLENGTH_64
}

// TODO(@KhafraDev): handle continuation frames separately as their
// semantics are different from TEXT/BINARY frames.
this.#info.originalOpcode ??= opcode
this.#info.opcode = opcode
this.#info.masked = masked
this.#info.fin = fin
this.#info.fragmented = fragmented

if (this.#info.fragmented && payloadLength > 125) {
// A fragmented frame can't be fragmented itself
failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.')
return
} else if (
(this.#info.opcode === opcodes.PING ||
this.#info.opcode === opcodes.PONG ||
this.#info.opcode === opcodes.CLOSE) &&
payloadLength > 125
) {
// Control frames can have a payload length of 125 bytes MAX
failWebsocketConnection(this.ws, 'Payload length for control frame exceeded 125 bytes.')
return
} else if (this.#info.opcode === opcodes.CLOSE) {
if (payloadLength === 1) {
failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.')
return
}

const body = this.consume(payloadLength)

this.#info.closeInfo = this.parseCloseBody(body)

if (this.#info.closeInfo.error) {
const { code, reason } = this.#info.closeInfo

callback(new CloseEvent('close', { wasClean: false, reason, code }))
return
}

if (this.ws[kSentClose] !== sentCloseFrameState.SENT) {
// If an endpoint receives a Close frame and did not previously send a
// Close frame, the endpoint MUST send a Close frame in response. (When
// sending a Close frame in response, the endpoint typically echos the
// status code it received.)
let body = emptyBuffer
if (this.#info.closeInfo.code) {
body = Buffer.allocUnsafe(2)
body.writeUInt16BE(this.#info.closeInfo.code, 0)
}
const closeFrame = new WebsocketFrameSend(body)

this.ws[kResponse].socket.write(
closeFrame.createFrame(opcodes.CLOSE),
(err) => {
if (!err) {
this.ws[kSentClose] = sentCloseFrameState.SENT
}
}
)
}

// Upon either sending or receiving a Close control frame, it is said
// that _The WebSocket Closing Handshake is Started_ and that the
// WebSocket connection is in the CLOSING state.
this.ws[kReadyState] = states.CLOSING
this.ws[kReceivedClose] = true

this.end()

return
} else if (this.#info.opcode === opcodes.PING) {
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame.
// A Pong frame sent in response to a Ping frame must have identical
// "Application data"

const body = this.consume(payloadLength)

if (!this.ws[kReceivedClose]) {
const frame = new WebsocketFrameSend(body)

this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))

if (channels.ping.hasSubscribers) {
channels.ping.publish({
payload: body
})
}
}

this.#state = parserStates.INFO

if (this.#byteOffset > 0) {
continue
} else {
callback()
return
}
} else if (this.#info.opcode === opcodes.PONG) {
// A Pong frame MAY be sent unsolicited. This serves as a
// unidirectional heartbeat. A response to an unsolicited Pong frame is
// not expected.

const body = this.consume(payloadLength)

if (channels.pong.hasSubscribers) {
channels.pong.publish({
payload: body
})
}

if (this.#byteOffset > 0) {
continue
} else {
callback()
return
}
}
} else if (this.#state === parserStates.PAYLOADLENGTH_16) {
if (this.#byteOffset < 2) {
Expand Down Expand Up @@ -303,6 +217,8 @@ class ByteParser extends Writable {
}

parseCloseBody (data) {
assert(data.length !== 1)

// https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
/** @type {number|undefined} */
let code
Expand Down Expand Up @@ -336,6 +252,111 @@ class ByteParser extends Writable {
return { code, reason, error: false }
}

/**
* Parses control frames.
* @param {Buffer} data
* @param {(err?: Error) => void} callback
* @param {{ opcode: number, fragmented: boolean, payloadLength: number }} info
*/
parseControlFrame (callback, info) {
assert(!info.fragmented)

if (info.payloadLength > 125) {
// Control frames can have a payload length of 125 bytes MAX
callback(new Error('Payload length for control frame exceeded 125 bytes.'))
return false
}

const body = this.consume(info.payloadLength)

if (info.opcode === opcodes.CLOSE) {
if (info.payloadLength === 1) {
failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.')
return
}

this.#info.closeInfo = this.parseCloseBody(body)

if (this.#info.closeInfo.error) {
const { code, reason } = this.#info.closeInfo

callback(new CloseEvent('close', { wasClean: false, reason, code }))
return
}

if (this.ws[kSentClose] !== sentCloseFrameState.SENT) {
// If an endpoint receives a Close frame and did not previously send a
// Close frame, the endpoint MUST send a Close frame in response. (When
// sending a Close frame in response, the endpoint typically echos the
// status code it received.)
let body = emptyBuffer
if (this.#info.closeInfo.code) {
body = Buffer.allocUnsafe(2)
body.writeUInt16BE(this.#info.closeInfo.code, 0)
}
const closeFrame = new WebsocketFrameSend(body)

this.ws[kResponse].socket.write(
closeFrame.createFrame(opcodes.CLOSE),
(err) => {
if (!err) {
this.ws[kSentClose] = sentCloseFrameState.SENT
}
}
)
}

// Upon either sending or receiving a Close control frame, it is said
// that _The WebSocket Closing Handshake is Started_ and that the
// WebSocket connection is in the CLOSING state.
this.ws[kReadyState] = states.CLOSING
this.ws[kReceivedClose] = true

this.end()

return
} else if (info.opcode === opcodes.PING) {
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame.
// A Pong frame sent in response to a Ping frame must have identical
// "Application data"

if (!this.ws[kReceivedClose]) {
const frame = new WebsocketFrameSend(body)

this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))

if (channels.ping.hasSubscribers) {
channels.ping.publish({
payload: body
})
}
}

if (this.#byteOffset <= 0) {
callback()
return false
}
} else if (info.opcode === opcodes.PONG) {
// A Pong frame MAY be sent unsolicited. This serves as a
// unidirectional heartbeat. A response to an unsolicited Pong frame is
// not expected.

if (channels.pong.hasSubscribers) {
channels.pong.publish({
payload: body
})
}

if (this.#byteOffset <= 0) {
callback()
return false
}
}

return true
}

get closingInfo () {
return this.#info.closeInfo
}
Expand Down
15 changes: 14 additions & 1 deletion lib/web/websocket/util.js
Expand Up @@ -210,6 +210,18 @@ function failWebsocketConnection (ws, reason) {
}
}

/**
* @see https://datatracker.ietf.org/doc/html/rfc6455#section-5.5
* @param {number} opcode
*/
function isControlFrame (opcode) {
return (
opcode === opcodes.CLOSE ||
opcode === opcodes.PING ||
opcode === opcodes.PONG
)
}

// https://nodejs.org/api/intl.html#detecting-internationalization-support
const hasIntl = typeof process.versions.icu === 'string'
const fatalDecoder = hasIntl ? new TextDecoder('utf-8', { fatal: true }) : undefined
Expand Down Expand Up @@ -237,5 +249,6 @@ module.exports = {
isValidStatusCode,
failWebsocketConnection,
websocketMessageReceived,
utf8Decode
utf8Decode,
isControlFrame
}
38 changes: 38 additions & 0 deletions test/websocket/issue-2859.js
@@ -0,0 +1,38 @@
'use strict'

const { test } = require('node:test')
const { WebSocketServer } = require('ws')
const { WebSocket } = require('../..')
const diagnosticsChannel = require('node:diagnostics_channel')
const { tspl } = require('@matteo.collina/tspl')

test('Fragmented frame with a ping frame in the first of it', async (t) => {
const { completed, deepStrictEqual, strictEqual } = tspl(t, { plan: 2 })

const server = new WebSocketServer({ port: 0 })

server.on('connection', (ws) => {
const socket = ws._socket

socket.write(Buffer.from([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f])) // ping "Hello"
socket.write(Buffer.from([0x01, 0x03, 0x48, 0x65, 0x6c])) // Text frame "Hel"
socket.write(Buffer.from([0x80, 0x02, 0x6c, 0x6f])) // Text frame "lo"
})

t.after(() => {
server.close()
ws.close()
})

const ws = new WebSocket(`ws://127.0.0.1:${server.address().port}`)

diagnosticsChannel.channel('undici:websocket:ping').subscribe(
({ payload }) => deepStrictEqual(payload, Buffer.from('Hello'))
)

ws.addEventListener('message', ({ data }) => {
strictEqual(data, 'Hello')
})

await completed
})

0 comments on commit 9302599

Please sign in to comment.