Skip to content

Commit

Permalink
Process one frame at a time, fixing unicode chunking issue
Browse files Browse the repository at this point in the history
  • Loading branch information
rexxars committed Aug 22, 2018
1 parent b009a96 commit aba2c2c
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 91 deletions.
137 changes: 48 additions & 89 deletions lib/eventsource.js
@@ -1,7 +1,10 @@
var original = require('original')
var through = require('through2')
var parse = require('url').parse
var events = require('events')
var https = require('https')
var split = require('split2')
var pump = require('pump')
var http = require('http')
var util = require('util')

Expand All @@ -10,6 +13,8 @@ var httpsOptions = [
'rejectUnauthorized', 'secureProtocol', 'servername'
]

var newline = /(\r\n|\n|\r)/

/**
* Creates a new EventSource object
*
Expand Down Expand Up @@ -60,10 +65,6 @@ function EventSource (url, eventSourceInitDict) {
delete eventSourceInitDict.headers['Last-Event-ID']
}

var discardTrailingNewline = false
var data = ''
var eventName = ''

var reconnectUrl = null

function connect () {
Expand Down Expand Up @@ -158,56 +159,58 @@ function EventSource (url, eventSourceInitDict) {
})
_emit('open', new Event('open'))

// text/event-stream parser adapted from webkit's
// Source/WebCore/page/EventSource.cpp
var buf = ''
res.on('data', function (chunk) {
buf += chunk
var firstChunk = true
pump(res, split(/(\r\n|\n|\r)(\r\n|\n|\r)/), through(function (chunk, enc, cb) {
// Skip BOM if present
var frame = chunk.toString()
if (firstChunk && frame.slice(0, 1) === '\uFEFF') {
frame = frame.slice(1)
}

var pos = 0
var length = buf.length
// Prepare an event
var eventName = ''
var data = ''

while (pos < length) {
if (discardTrailingNewline) {
if (buf[pos] === '\n') {
++pos
}
discardTrailingNewline = false
// Split frame into individual lines
var lines = frame.split(newline)
lines.forEach(function (line) {
if (line[0] === ':') {
// Comment, ignore this line
return
}

var lineLength = -1
var fieldLength = -1
var c

for (var i = pos; lineLength < 0 && i < length; ++i) {
c = buf[i]
if (c === ':') {
if (fieldLength < 0) {
fieldLength = i - pos
}
} else if (c === '\r') {
discardTrailingNewline = true
lineLength = i - pos
} else if (c === '\n') {
lineLength = i - pos
var fieldSeparator = line.indexOf(':')
var hasValue = fieldSeparator !== -1

var field = hasValue ? line.slice(0, fieldSeparator) : line
var value = hasValue ? line.slice(fieldSeparator + 1).replace(/^ /, '') : ''

if (field === 'event') {
eventName = value
} else if (field === 'data') {
data += value + '\n'
} else if (field === 'id') {
lastEventId = value
} else if (field === 'retry') {
var retry = parseInt(value, 10)
if (!Number.isNaN(retry)) {
self.reconnectInterval = retry
}
}
})

if (lineLength < 0) {
break
}

parseEventStreamLine(buf, pos, fieldLength, lineLength)

pos += lineLength + 1
var type = eventName || 'message'
if (data.length > 0) {
_emit(type, new MessageEvent(type, {
data: data.slice(0, -1), // remove trailing newline
lastEventId: lastEventId,
origin: original(url)
}))
}

if (pos === length) {
buf = ''
} else if (pos > 0) {
buf = buf.slice(pos)
}
})
firstChunk = false
cb()
}))
})

req.on('error', onConnectionClosed)
Expand All @@ -229,50 +232,6 @@ function EventSource (url, eventSourceInitDict) {
if (req.abort) req.abort()
if (req.xhr && req.xhr.abort) req.xhr.abort()
}

function parseEventStreamLine (buf, pos, fieldLength, lineLength) {
if (lineLength === 0) {
if (data.length > 0) {
var type = eventName || 'message'
_emit(type, new MessageEvent(type, {
data: data.slice(0, -1), // remove trailing newline
lastEventId: lastEventId,
origin: original(url)
}))
data = ''
}
eventName = void 0
} else if (fieldLength > 0) {
var noValue = fieldLength < 0
var step = 0
var field = buf.slice(pos, pos + (noValue ? lineLength : fieldLength))

if (noValue) {
step = lineLength
} else if (buf[pos + fieldLength + 1] !== ' ') {
step = fieldLength + 1
} else {
step = fieldLength + 2
}
pos += step

var valueLength = lineLength - step
var value = buf.slice(pos, pos + valueLength)

if (field === 'data') {
data += value + '\n'
} else if (field === 'event') {
eventName = value
} else if (field === 'id') {
lastEventId = value
} else if (field === 'retry') {
var retry = parseInt(value, 10)
if (!Number.isNaN(retry)) {
self.reconnectInterval = retry
}
}
}
}
}

module.exports = EventSource
Expand Down
6 changes: 5 additions & 1 deletion package.json
Expand Up @@ -30,6 +30,7 @@
}
],
"devDependencies": {
"buffer-from": "^1.1.1",
"express": "^4.15.3",
"mocha": "^3.5.3",
"nyc": "^11.2.1",
Expand All @@ -48,7 +49,10 @@
"node": ">=0.12.0"
},
"dependencies": {
"original": "^1.0.0"
"original": "^1.0.0",
"pump": "^3.0.0",
"split2": "^3.0.0",
"through2": "^2.0.3"
},
"standard": {
"ignore": [
Expand Down
45 changes: 44 additions & 1 deletion test/eventsource_test.js
@@ -1,5 +1,6 @@
/* eslint-disable no-new */
var EventSource = require('../lib/eventsource')
var bufferFrom = require('buffer-from')
var path = require('path')
var http = require('http')
var https = require('https')
Expand Down Expand Up @@ -167,6 +168,24 @@ describe('Parser', function () {
})
})

it('ignores byte-order mark', function (done) {
createServer(function (err, server) {
if (err) return done(err)

server.on('request', function (req, res) {
res.writeHead(200, {'Content-Type': 'text/event-stream'})
res.write('\uFEFF')
res.write('data: foo')
res.end()
})
var es = new EventSource(server.url)
es.onmessage = function (m) {
assert.equal('foo', m.data)
server.close(done)
}
})
})

it('parses one one-line message in two chunks', function (done) {
createServer(function (err, server) {
if (err) return done(err)
Expand Down Expand Up @@ -215,7 +234,7 @@ describe('Parser', function () {
})
})

it('parses really chopped up unicode data', function (done) {
it('parses chopped up unicode data', function (done) {
createServer(function (err, server) {
if (err) return done(err)

Expand All @@ -237,6 +256,30 @@ describe('Parser', function () {
})
})

it('parses really chopped up unicode data', function (done) {
createServer(function (err, server) {
if (err) return done(err)

server.on('request', function (req, res) {
const msg = bufferFrom('data: Aslak Hellesøy is the original author\n\n')
res.writeHead(200, {'Content-Type': 'text/event-stream'})

// Slice in the middle of a unicode sequence (ø), making sure that one data
// chunk will contain the first byte and the second chunk will get the other
res.write(msg.slice(0, 19), 'binary', function () {
res.write(msg.slice(19))
})
})

var es = new EventSource(server.url)

es.onmessage = function (m) {
assert.equal('Aslak Hellesøy is the original author', m.data)
server.close(done)
}
})
})

it('accepts CRLF as separator', function (done) {
createServer(function (err, server) {
if (err) return done(err)
Expand Down

0 comments on commit aba2c2c

Please sign in to comment.