Skip to content

Commit

Permalink
feat: new implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
climba03003 committed Oct 27, 2022
1 parent 2065c11 commit 670641f
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 29 deletions.
30 changes: 19 additions & 11 deletions lib/reply.js
@@ -1,6 +1,7 @@
'use strict'

const eos = require('stream').finished
const Cloneable = require('cloneable-readable')

const {
kFourOhFourContext,
Expand Down Expand Up @@ -618,8 +619,8 @@ function sendStream (payload, res, reply) {
let sourceOpen = true
let errorLogged = false

// set trailer when stream ended
sendStreamTrailer(payload, res, reply)
// clone stream for trailer
if (reply[kReplyTrailers] !== null) payload = Cloneable(payload)

eos(payload, { readable: true, writable: false }, function (err) {
sourceOpen = false
Expand Down Expand Up @@ -666,18 +667,20 @@ function sendStream (payload, res, reply) {
} else {
reply.log.warn('response will send, but you shouldn\'t use res.writeHead in stream mode')
}
payload.pipe(res)

sendStreamTrailer(payload, res, reply)

payload.pipe(res, { end: reply[kReplyTrailers] === null })
}

function sendTrailer (payload, res, reply) {
if (reply[kReplyTrailers] === null) return
const trailerHeaders = Object.keys(reply[kReplyTrailers])
const trailers = {}
let handled = 0
for (const trailerName of trailerHeaders) {
if (typeof reply[kReplyTrailers][trailerName] !== 'function') continue
handled--

const _trailers = Object.entries(reply[kReplyTrailers]).filter(([k, v]) => typeof v === 'function')

for (const [name, func] of _trailers) {
function cb (err, value) {
// TODO: we may protect multiple callback calls
// or mixing async-await with callback
Expand All @@ -687,16 +690,18 @@ function sendTrailer (payload, res, reply) {
// since it does affect the client
// we log in here only for debug usage
if (err) reply.log.debug(err)
else trailers[trailerName] = value
else trailers[name] = value

// add trailers when all handler handled
/* istanbul ignore else */
if (handled === 0) {
if (_trailers.length === handled) {
res.addTrailers(trailers)
// end the stream properly
res.end(null, null, null)
}
}

const result = reply[kReplyTrailers][trailerName](reply, payload, cb)
const result = func(reply, payload instanceof Cloneable ? payload.clone() : payload, cb)
if (typeof result === 'object' && typeof result.then === 'function') {
result.then((v) => cb(null, v), cb)
} else if (result !== null && result !== undefined) {
Expand All @@ -709,7 +714,10 @@ function sendTrailer (payload, res, reply) {

function sendStreamTrailer (payload, res, reply) {
if (reply[kReplyTrailers] === null) return
payload.on('end', () => sendTrailer(null, res, reply))

// we need to attach stream as soon as possible
// otherwise, the payload will never flow
sendTrailer(payload, res, reply)
}

function onErrorHook (reply, error, cb) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -176,6 +176,7 @@
"@fastify/fast-json-stringify-compiler": "^4.1.0",
"abstract-logging": "^2.0.1",
"avvio": "^8.2.0",
"cloneable-readable": "^3.0.0",
"find-my-way": "^7.3.0",
"light-my-request": "^5.6.1",
"pino": "^8.5.0",
Expand Down
83 changes: 65 additions & 18 deletions test/reply-trailers.test.js
Expand Up @@ -111,29 +111,76 @@ test('send trailers when payload is json', t => {
})

test('send trailers when payload is stream', t => {
t.plan(7)
t.plan(2)

const fastify = Fastify()
t.test('single trailers', t => {
t.plan(7)

fastify.get('/', function (request, reply) {
reply.trailer('ETag', function (reply, payload, done) {
t.same(payload, null)
done(null, 'custom-etag')
const fastify = Fastify()

fastify.get('/', function (request, reply) {
reply.trailer('ETag', function (reply, payload, done) {
t.same(typeof payload.pipe === 'function', true)
done(null, 'custom-etag')
})
const stream = Readable.from([JSON.stringify({ hello: 'world' })])
reply.send(stream)
})

fastify.inject({
method: 'GET',
url: '/'
}, (error, res) => {
t.error(error)
t.equal(res.statusCode, 200)
t.equal(res.headers['transfer-encoding'], 'chunked')
t.equal(res.headers.trailer, 'etag')
t.equal(res.trailers.etag, 'custom-etag')
t.notHas(res.headers, 'content-length')
})
const stream = Readable.from([JSON.stringify({ hello: 'world' })])
reply.send(stream)
})

fastify.inject({
method: 'GET',
url: '/'
}, (error, res) => {
t.error(error)
t.equal(res.statusCode, 200)
t.equal(res.headers['transfer-encoding'], 'chunked')
t.equal(res.headers.trailer, 'etag')
t.equal(res.trailers.etag, 'custom-etag')
t.notHas(res.headers, 'content-length')
t.test('multiple trailers', t => {
t.plan(9)

const fastify = Fastify()
const data = JSON.stringify({ hello: 'world' })
const hash = createHash('md5')
hash.update(data)
const md5 = hash.digest('hex')

fastify.get('/', function (request, reply) {
reply.trailer('ETag', function (reply, payload, done) {
t.same(typeof payload.pipe === 'function', true)
payload.on('end', () => {
done(null, 'custom-etag')
})
})
reply.trailer('Content-MD5', function (reply, payload, done) {
t.same(typeof payload.pipe === 'function', true)
const hash = createHash('md5')
payload.pipe(hash)
payload.on('end', () => {
hash.end()
done(null, hash.read().toString('hex'))
})
})
const stream = Readable.from([data])
reply.send(stream)
})

fastify.inject({
method: 'GET',
url: '/'
}, (error, res) => {
t.error(error)
t.equal(res.statusCode, 200)
t.equal(res.headers['transfer-encoding'], 'chunked')
t.equal(res.headers.trailer, 'etag content-md5')
t.equal(res.trailers.etag, 'custom-etag')
t.equal(res.trailers['content-md5'], md5)
t.notHas(res.headers, 'content-length')
})
})
})

Expand Down

0 comments on commit 670641f

Please sign in to comment.