Skip to content

Commit

Permalink
fix: trailers async race condition (#4383)
Browse files Browse the repository at this point in the history
* fix: trailers async race condition

* fix: import

* refactor: typo and comment
  • Loading branch information
climba03003 committed Oct 28, 2022
1 parent 2065c11 commit 183576d
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 10 deletions.
36 changes: 26 additions & 10 deletions lib/reply.js
Expand Up @@ -570,8 +570,6 @@ function onSendEnd (reply, payload) {

res.writeHead(statusCode, reply[kReplyHeaders])
sendTrailer(payload, res, reply)
// avoid ArgumentsAdaptorTrampoline from V8
res.end(null, null, null)
return
}

Expand Down Expand Up @@ -600,8 +598,6 @@ function onSendEnd (reply, payload) {
res.write(payload)
// then send trailers
sendTrailer(payload, res, reply)
// avoid ArgumentsAdaptorTrampoline from V8
res.end(null, null, null)
}

function logStreamError (logger, err, res) {
Expand Down Expand Up @@ -670,12 +666,29 @@ function sendStream (payload, res, reply) {
}

function sendTrailer (payload, res, reply) {
if (reply[kReplyTrailers] === null) return
if (reply[kReplyTrailers] === null) {
// when no trailer, we close the stream
res.end(null, null, null) // avoid ArgumentsAdaptorTrampoline from V8
return
}
const trailerHeaders = Object.keys(reply[kReplyTrailers])
const trailers = {}
let handled = 0
let skipped = true
function send () {
// add trailers when all handler handled
/* istanbul ignore else */
if (handled === 0) {
res.addTrailers(trailers)
// we need to properly close the stream
// after trailers sent
res.end(null, null, null) // avoid ArgumentsAdaptorTrampoline from V8
}
}

for (const trailerName of trailerHeaders) {
if (typeof reply[kReplyTrailers][trailerName] !== 'function') continue
skipped = false
handled--

function cb (err, value) {
Expand All @@ -689,11 +702,10 @@ function sendTrailer (payload, res, reply) {
if (err) reply.log.debug(err)
else trailers[trailerName] = value

// add trailers when all handler handled
/* istanbul ignore else */
if (handled === 0) {
res.addTrailers(trailers)
}
// we push the check to the end of event
// loop, so the registration continue to
// process.
process.nextTick(send)
}

const result = reply[kReplyTrailers][trailerName](reply, payload, cb)
Expand All @@ -705,6 +717,10 @@ function sendTrailer (payload, res, reply) {
cb(null, result)
}
}

// when all trailers are skipped
// we need to close the stream
if (skipped) res.end(null, null, null) // avoid ArgumentsAdaptorTrampoline from V8
}

function sendStreamTrailer (payload, res, reply) {
Expand Down
110 changes: 110 additions & 0 deletions test/reply-trailers.test.js
Expand Up @@ -5,6 +5,8 @@ const test = t.test
const Fastify = require('..')
const { Readable } = require('stream')
const { createHash } = require('crypto')
const { promisify } = require('util')
const sleep = promisify(setTimeout)

test('send trailers when payload is empty string', t => {
t.plan(5)
Expand Down Expand Up @@ -216,6 +218,82 @@ test('should emit deprecation warning when using direct return', t => {
})
})

test('trailer handler counter', t => {
t.plan(2)

const data = JSON.stringify({ hello: 'world' })
const hash = createHash('md5')
hash.update(data)
const md5 = hash.digest('hex')

t.test('callback with timeout', t => {
t.plan(9)
const fastify = Fastify()

fastify.get('/', function (request, reply) {
reply.trailer('Return-Early', function (reply, payload, done) {
t.equal(data, payload)
done(null, 'return')
})
reply.trailer('Content-MD5', function (reply, payload, done) {
t.equal(data, payload)
const hash = createHash('md5')
hash.update(payload)
setTimeout(() => {
done(null, hash.digest('hex'))
}, 500)
})
reply.send(data)
})

fastify.inject({
method: 'GET',
url: '/'
}, (error, res) => {
t.error(error)
t.equal(res.statusCode, 200)
t.equal(res.headers['transfer-encoding'], 'chunked')
t.equal(res.headers.trailer, 'return-early content-md5')
t.equal(res.trailers['return-early'], 'return')
t.equal(res.trailers['content-md5'], md5)
t.notHas(res.headers, 'content-length')
})
})

t.test('async-await', t => {
t.plan(9)
const fastify = Fastify()

fastify.get('/', function (request, reply) {
reply.trailer('Return-Early', async function (reply, payload) {
t.equal(data, payload)
return 'return'
})
reply.trailer('Content-MD5', async function (reply, payload) {
t.equal(data, payload)
const hash = createHash('md5')
hash.update(payload)
await sleep(500)
return hash.digest('hex')
})
reply.send(data)
})

fastify.inject({
method: 'GET',
url: '/'
}, (error, res) => {
t.error(error)
t.equal(res.statusCode, 200)
t.equal(res.headers['transfer-encoding'], 'chunked')
t.equal(res.headers.trailer, 'return-early content-md5')
t.equal(res.trailers['return-early'], 'return')
t.equal(res.trailers['content-md5'], md5)
t.notHas(res.headers, 'content-length')
})
})
})

test('removeTrailer', t => {
t.plan(6)

Expand Down Expand Up @@ -247,6 +325,38 @@ test('removeTrailer', t => {
})
})

test('remove all trailers', t => {
t.plan(6)

const fastify = Fastify()

fastify.get('/', function (request, reply) {
reply.trailer('ETag', function (reply, payload, done) {
t.fail('it should not called as this trailer is removed')
done(null, 'custom-etag')
})
reply.removeTrailer('ETag')
reply.trailer('Should-Not-Call', function (reply, payload, done) {
t.fail('it should not called as this trailer is removed')
done(null, 'should-not-call')
})
reply.removeTrailer('Should-Not-Call')
reply.send('')
})

fastify.inject({
method: 'GET',
url: '/'
}, (error, res) => {
t.error(error)
t.equal(res.statusCode, 200)
t.notOk(res.headers.trailer)
t.notOk(res.trailers.etag)
t.notOk(res.trailers['should-not-call'])
t.notHas(res.headers, 'content-length')
})
})

test('hasTrailer', t => {
t.plan(10)

Expand Down

0 comments on commit 183576d

Please sign in to comment.