diff --git a/lib/eventsource.js b/lib/eventsource.js index 68d3584a..e8341d1a 100644 --- a/lib/eventsource.js +++ b/lib/eventsource.js @@ -1,7 +1,10 @@ var original = require('original') +var through = require('through2') var parse = require('url').parse var events = require('events') var https = require('https') +var split = require('split2') +var pump = require('pump') var http = require('http') var util = require('util') @@ -10,6 +13,8 @@ var httpsOptions = [ 'rejectUnauthorized', 'secureProtocol', 'servername' ] +var newline = /(\r\n|\n|\r)/ + /** * Creates a new EventSource object * @@ -60,10 +65,6 @@ function EventSource (url, eventSourceInitDict) { delete eventSourceInitDict.headers['Last-Event-ID'] } - var discardTrailingNewline = false - var data = '' - var eventName = '' - var reconnectUrl = null function connect () { @@ -158,56 +159,58 @@ function EventSource (url, eventSourceInitDict) { }) _emit('open', new Event('open')) - // text/event-stream parser adapted from webkit's - // Source/WebCore/page/EventSource.cpp - var buf = '' - res.on('data', function (chunk) { - buf += chunk + var firstChunk = true + pump(res, split(/(\r\n|\n|\r)(\r\n|\n|\r)/), through(function (chunk, enc, cb) { + // Skip BOM if present + var frame = chunk.toString() + if (firstChunk && frame.slice(0, 1) === '\uFEFF') { + frame = frame.slice(1) + } - var pos = 0 - var length = buf.length + // Prepare an event + var eventName = '' + var data = '' - while (pos < length) { - if (discardTrailingNewline) { - if (buf[pos] === '\n') { - ++pos - } - discardTrailingNewline = false + // Split frame into individual lines + var lines = frame.split(newline) + lines.forEach(function (line) { + if (line[0] === ':') { + // Comment, ignore this line + return } - var lineLength = -1 - var fieldLength = -1 - var c - - for (var i = pos; lineLength < 0 && i < length; ++i) { - c = buf[i] - if (c === ':') { - if (fieldLength < 0) { - fieldLength = i - pos - } - } else if (c === '\r') { - discardTrailingNewline = true - lineLength = i - pos - } else if (c === '\n') { - lineLength = i - pos + var fieldSeparator = line.indexOf(':') + var hasValue = fieldSeparator !== -1 + + var field = hasValue ? line.slice(0, fieldSeparator) : line + var value = hasValue ? line.slice(fieldSeparator + 1).replace(/^ /, '') : '' + + if (field === 'event') { + eventName = value + } else if (field === 'data') { + data += value + '\n' + } else if (field === 'id') { + lastEventId = value + } else if (field === 'retry') { + var retry = parseInt(value, 10) + if (!Number.isNaN(retry)) { + self.reconnectInterval = retry } } + }) - if (lineLength < 0) { - break - } - - parseEventStreamLine(buf, pos, fieldLength, lineLength) - - pos += lineLength + 1 + var type = eventName || 'message' + if (data.length > 0) { + _emit(type, new MessageEvent(type, { + data: data.slice(0, -1), // remove trailing newline + lastEventId: lastEventId, + origin: original(url) + })) } - if (pos === length) { - buf = '' - } else if (pos > 0) { - buf = buf.slice(pos) - } - }) + firstChunk = false + cb() + })) }) req.on('error', onConnectionClosed) @@ -229,50 +232,6 @@ function EventSource (url, eventSourceInitDict) { if (req.abort) req.abort() if (req.xhr && req.xhr.abort) req.xhr.abort() } - - function parseEventStreamLine (buf, pos, fieldLength, lineLength) { - if (lineLength === 0) { - if (data.length > 0) { - var type = eventName || 'message' - _emit(type, new MessageEvent(type, { - data: data.slice(0, -1), // remove trailing newline - lastEventId: lastEventId, - origin: original(url) - })) - data = '' - } - eventName = void 0 - } else if (fieldLength > 0) { - var noValue = fieldLength < 0 - var step = 0 - var field = buf.slice(pos, pos + (noValue ? lineLength : fieldLength)) - - if (noValue) { - step = lineLength - } else if (buf[pos + fieldLength + 1] !== ' ') { - step = fieldLength + 1 - } else { - step = fieldLength + 2 - } - pos += step - - var valueLength = lineLength - step - var value = buf.slice(pos, pos + valueLength) - - if (field === 'data') { - data += value + '\n' - } else if (field === 'event') { - eventName = value - } else if (field === 'id') { - lastEventId = value - } else if (field === 'retry') { - var retry = parseInt(value, 10) - if (!Number.isNaN(retry)) { - self.reconnectInterval = retry - } - } - } - } } module.exports = EventSource diff --git a/package.json b/package.json index 1e25f5a9..d7320598 100644 --- a/package.json +++ b/package.json @@ -30,6 +30,7 @@ } ], "devDependencies": { + "buffer-from": "^1.1.1", "express": "^4.15.3", "mocha": "^3.5.3", "nyc": "^11.2.1", @@ -48,7 +49,10 @@ "node": ">=0.12.0" }, "dependencies": { - "original": "^1.0.0" + "original": "^1.0.0", + "pump": "^3.0.0", + "split2": "^3.0.0", + "through2": "^2.0.3" }, "standard": { "ignore": [ diff --git a/test/eventsource_test.js b/test/eventsource_test.js index ea366cc0..1da0a2da 100644 --- a/test/eventsource_test.js +++ b/test/eventsource_test.js @@ -1,5 +1,6 @@ /* eslint-disable no-new */ var EventSource = require('../lib/eventsource') +var bufferFrom = require('buffer-from') var path = require('path') var http = require('http') var https = require('https') @@ -167,6 +168,24 @@ describe('Parser', function () { }) }) + it('ignores byte-order mark', function (done) { + createServer(function (err, server) { + if (err) return done(err) + + server.on('request', function (req, res) { + res.writeHead(200, {'Content-Type': 'text/event-stream'}) + res.write('\uFEFF') + res.write('data: foo') + res.end() + }) + var es = new EventSource(server.url) + es.onmessage = function (m) { + assert.equal('foo', m.data) + server.close(done) + } + }) + }) + it('parses one one-line message in two chunks', function (done) { createServer(function (err, server) { if (err) return done(err) @@ -215,7 +234,7 @@ describe('Parser', function () { }) }) - it('parses really chopped up unicode data', function (done) { + it('parses chopped up unicode data', function (done) { createServer(function (err, server) { if (err) return done(err) @@ -237,6 +256,30 @@ describe('Parser', function () { }) }) + it('parses really chopped up unicode data', function (done) { + createServer(function (err, server) { + if (err) return done(err) + + server.on('request', function (req, res) { + const msg = bufferFrom('data: Aslak Hellesøy is the original author\n\n') + res.writeHead(200, {'Content-Type': 'text/event-stream'}) + + // Slice in the middle of a unicode sequence (ø), making sure that one data + // chunk will contain the first byte and the second chunk will get the other + res.write(msg.slice(0, 19), 'binary', function () { + res.write(msg.slice(19)) + }) + }) + + var es = new EventSource(server.url) + + es.onmessage = function (m) { + assert.equal('Aslak Hellesøy is the original author', m.data) + server.close(done) + } + }) + }) + it('accepts CRLF as separator', function (done) { createServer(function (err, server) { if (err) return done(err)