Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement EventSource #2608

Merged
merged 63 commits into from Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
bace6e2
feat: implement eventsource
Uzlopak Jan 11, 2024
cb1db29
add wpts
KhafraDev Jan 11, 2024
926438e
make partially work wpts
Uzlopak Jan 11, 2024
1c7fcb2
fix some
Uzlopak Jan 12, 2024
2e69663
restructure, use ErrorEvent
Uzlopak Jan 12, 2024
79e7419
fix
Uzlopak Jan 12, 2024
54b234a
restructure, create distinct OpenEvent
Uzlopak Jan 12, 2024
3337d7d
add experimental warning, transform inputs
Uzlopak Jan 12, 2024
82214e0
add-types
Uzlopak Jan 15, 2024
56ec56f
restructure
Uzlopak Jan 15, 2024
8c76a8a
add TODO for comment
Uzlopak Jan 15, 2024
755b2d6
use mainFetch
Uzlopak Jan 15, 2024
4e7eeb5
k
Uzlopak Jan 15, 2024
a46bb20
make it terminatable
Uzlopak Jan 15, 2024
bb416e6
fix
Uzlopak Jan 15, 2024
36d9747
fix
Uzlopak Jan 15, 2024
bdc793c
remove OpenEvent
Uzlopak Jan 15, 2024
14e503b
fix wpt runner
Uzlopak Jan 15, 2024
c73c67d
fix
Uzlopak Jan 15, 2024
11ee17d
Merge branch 'main' into eventsource
Uzlopak Jan 16, 2024
2c65c75
Apply suggestions from code review
Uzlopak Jan 16, 2024
7c38511
Update lib/eventsource/index.js
Uzlopak Jan 16, 2024
d7a7dab
fetching
KhafraDev Jan 16, 2024
110fc04
Merge branch 'main' into eventsource
Uzlopak Jan 16, 2024
1fa548b
improve BOM check
Uzlopak Jan 17, 2024
03dc926
improve parseLine
Uzlopak Jan 17, 2024
e04194c
rename back index.js to eventsource.js
Uzlopak Jan 17, 2024
041c648
improve
Uzlopak Jan 17, 2024
55e2729
rename eventSourceState
Uzlopak Jan 17, 2024
fc32639
fix
Uzlopak Jan 17, 2024
11f5814
fix
Uzlopak Jan 17, 2024
6ab5dea
fix
Uzlopak Jan 17, 2024
ba941ea
fix
Uzlopak Jan 17, 2024
f8dbc1a
add settings environment
Uzlopak Jan 17, 2024
b9fed12
fix isNetworkError
Uzlopak Jan 17, 2024
7a834fb
add route, fix 2 tests
KhafraDev Jan 17, 2024
ffaac56
fixup
KhafraDev Jan 17, 2024
46360dd
improve CRLF processing, add tests
Uzlopak Jan 17, 2024
05b0fa7
more tests
Uzlopak Jan 17, 2024
ef19996
remove constants.js
Uzlopak Jan 17, 2024
9ea9156
improve parseLine logic
Uzlopak Jan 18, 2024
1a2d508
rename
Uzlopak Jan 18, 2024
6d8a2e9
fixup
KhafraDev Jan 18, 2024
1531c8d
add ignored tests of wpt
Uzlopak Jan 18, 2024
7b32778
better
Uzlopak Jan 18, 2024
4b7ad3a
fix more
Uzlopak Jan 18, 2024
2e74bb3
Merge branch 'main' into eventsource
Uzlopak Jan 18, 2024
67fef73
add docs
Uzlopak Jan 18, 2024
104da43
fix setting origin on message event
Uzlopak Jan 19, 2024
2ca2594
add more tests
Uzlopak Jan 19, 2024
98eae69
fix wpt tests
Uzlopak Jan 19, 2024
49d40cc
add EventSource documentation to website sidebar
Uzlopak Jan 19, 2024
6d3f48a
activate skipped wpt test
Uzlopak Jan 19, 2024
8841ef1
fix some remarks
Uzlopak Jan 20, 2024
15fea93
simplify
Uzlopak Jan 20, 2024
cb392ef
remove newline
Uzlopak Jan 20, 2024
920885e
remove usage of ErrorEvent
Uzlopak Jan 20, 2024
b643dde
harden
Uzlopak Jan 20, 2024
7fd4a64
more tests
Uzlopak Jan 20, 2024
9592c47
dont check for strings in isValidLastEventId
Uzlopak Jan 20, 2024
3e912ec
add TODOs
Uzlopak Jan 20, 2024
4b18785
improve example for eventsource
Uzlopak Jan 20, 2024
42fbe93
trigger CI because node 21.6.1 got released
Uzlopak Jan 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 78 additions & 0 deletions lib/eventsource/constants.js
@@ -0,0 +1,78 @@
'use strict'

/**
* The event stream format's MIME type is text/event-stream.
* @see https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
*/
const mimeType = 'text/event-stream'

/**
* A reconnection time, in milliseconds. This must initially be an implementation-defined value,
* probably in the region of a few seconds.
*
* In Comparison:
* - Chrome uses 3000ms.
* - Deno uses 5000ms.
*/
const defaultReconnectionTime = 3000

/**
* The readyState attribute represents the state of the connection.
* @enum
* @readonly
* @see https://html.spec.whatwg.org/multipage/server-sent-events.html#dom-eventsource-readystate-dev
*/

/**
* The connection has not yet been established, or it was closed and the user
* agent is reconnecting.
* @type {0}
*/
const CONNECTING = 0

/**
* The user agent has an open connection and is dispatching events as it
* receives them.
* @type {1}
*/
const OPEN = 1

/**
* The connection is not open, and the user agent is not trying to reconnect.
* @type {2}
*/
const CLOSED = 2

/**
* @type {number[]} BOM
*/
const BOM = [0xEF, 0xBB, 0xBF]
/**
* @type {10} LF
*/
const LF = 0x0A
/**
* @type {13} CR
*/
const CR = 0x0D
/**
* @type {58} COLON
*/
const COLON = 0x3A
/**
* @type {32} SPACE
*/
const SPACE = 0x20

module.exports = {
mimeType,
defaultReconnectionTime,
CONNECTING,
OPEN,
CLOSED,
BOM,
LF,
CR,
COLON,
SPACE
}
225 changes: 225 additions & 0 deletions lib/eventsource/eventsource-stream.js
Uzlopak marked this conversation as resolved.
Show resolved Hide resolved
@@ -0,0 +1,225 @@
'use strict'
const { Transform } = require('node:stream')
const { MessageEvent } = require('../websocket/events')
const { BOM, CR, LF, COLON, SPACE } = require('./constants')
const { isASCIINumber, isValidLastEventId } = require('./util')

/**
* @typedef {object} EventSourceStreamEvent
* @type {object}
* @property {string} [event] The event type.
* @property {string} [data] The data of the message.
* @property {string} [id] A unique ID for the event.
* @property {string} [retry] The reconnection time, in milliseconds.
*/

/**
* @typedef EventSourceState
* @type {object}
* @property {string} lastEventId The last event ID received from the server.
* @property {string} origin The origin of the event source.
* @property {number} reconnectionTime The reconnection time, in milliseconds.
*/

class EventSourceStream extends Transform {
/**
* @type {EventSourceState}
*/
state = null

/**
* Leading byte-order-mark check.
* @type {boolean}
*/
checkBOM = true

/**
* @type {boolean}
*/
crlfCheck = false

/**
* @type {boolean}
*/
eventEndCheck = false

/**
* @type {Buffer}
*/
buffer = null

pos = 0

event = {
data: undefined,
event: undefined,
id: undefined,
retry: undefined
}

/**
* @param {object} options
* @param {EventSourceState} options.eventSourceState
* @param {Function} [options.push]
*/
constructor (options = {}) {
options.readableObjectMode = true
super(options)
this.state = options.eventSourceState
if (options.push) {
this.push = options.push
}
}

/**
* @param {Buffer} chunk
* @param {string} _encoding
* @param {Function} callback
* @returns {void}
*/
_transform (chunk, _encoding, callback) {
if (chunk.length === 0) {
callback()
return
}
this.buffer = this.buffer ? Buffer.concat([this.buffer, chunk]) : chunk

// Strip leading byte-order-mark if any
if (this.checkBOM) {
switch (this.buffer.length) {
case 1:
if (this.buffer[0] === BOM[0]) {
callback()
return
}
this.checkBOM = false
break
case 2:
if (this.buffer[0] === BOM[0] && this.buffer[1] === BOM[1]) {
callback()
return
}
this.checkBOM = false
break
case 3:
if (this.buffer[0] === BOM[0] && this.buffer[1] === BOM[1] && this.buffer[2] === BOM[2]) {
this.buffer = this.buffer.slice(3)
this.checkBOM = false
callback()
return
}
this.checkBOM = false
break
default:
if (this.buffer[0] === BOM[0] && this.buffer[1] === BOM[1] && this.buffer[2] === BOM[2]) {
this.buffer = this.buffer.slice(3)
}
this.checkBOM = false
break
}
}

while (this.pos < this.buffer.length) {
if (this.buffer[this.pos] === LF || this.buffer[this.pos] === CR) {
if (this.eventEndCheck) {
this.eventEndCheck = false
this.processEvent(this.event)
this.event = {
data: undefined,
event: undefined,
id: undefined,
retry: undefined
}
this.buffer = this.buffer.slice(1)
continue
}
if (this.buffer[0] === COLON) {
this.buffer = this.buffer.slice(1)
continue
}
this.parseLine(this.buffer.slice(0, this.pos), this.event)

// Remove the processed line from the buffer
this.buffer = this.buffer.slice(this.pos + 1)
// Reset the position
this.pos = 0
this.eventEndCheck = true
continue
}
this.pos++
}

callback()
}

/**
* @param {Buffer} line
* @param {EventSourceStreamEvent} event
*/
parseLine (line, event) {
if (line.length === 0) {
return
}
const fieldNameEnd = line.indexOf(COLON)
let fieldValueStart

if (fieldNameEnd === -1) {
return
// fieldNameEnd = line.length;
// fieldValueStart = line.length;
}
fieldValueStart = fieldNameEnd + 1
if (line[fieldValueStart] === SPACE) {
fieldValueStart += 1
}

const fieldValueSize = line.length - fieldValueStart
const fieldName = line.slice(0, fieldNameEnd).toString('utf8')
switch (fieldName) {
case 'data':
event.data = line.slice(fieldValueStart, fieldValueStart + fieldValueSize).toString('utf8')
break
case 'event':
event.event = line.slice(fieldValueStart, fieldValueStart + fieldValueSize).toString('utf8')
break
case 'id':
event.id = line.slice(fieldValueStart, fieldValueStart + fieldValueSize).toString('utf8')
break
case 'retry':
event.retry = line.slice(fieldValueStart, fieldValueStart + fieldValueSize).toString('utf8')
break
}
}

/**
* @param {EventSourceStreamEvent} event
*/
processEvent (event) {
if (event.retry) {
if (isASCIINumber(event.retry)) {
this.state.reconnectionTime = parseInt(event.retry, 10)
}
}
const {
id,
data = null,
event: type = 'message'
} = event

if (id && isValidLastEventId(id)) {
this.state.lastEventId = id
}

this.push(
new MessageEvent(type, {
data,
lastEventId: this.state.lastEventId,
origin: this.state.origin
})
)
}
}

module.exports = {
EventSourceStream
}