Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: trailers async race condition #4383

Merged
merged 3 commits into from Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 26 additions & 10 deletions lib/reply.js
Expand Up @@ -570,8 +570,6 @@ function onSendEnd (reply, payload) {

res.writeHead(statusCode, reply[kReplyHeaders])
sendTrailer(payload, res, reply)
// avoid ArgumentsAdaptorTrampoline from V8
res.end(null, null, null)
return
}

Expand Down Expand Up @@ -600,8 +598,6 @@ function onSendEnd (reply, payload) {
res.write(payload)
// then send trailers
sendTrailer(payload, res, reply)
// avoid ArgumentsAdaptorTrampoline from V8
res.end(null, null, null)
}

function logStreamError (logger, err, res) {
Expand Down Expand Up @@ -670,12 +666,29 @@ function sendStream (payload, res, reply) {
}

function sendTrailer (payload, res, reply) {
if (reply[kReplyTrailers] === null) return
if (reply[kReplyTrailers] === null) {
// when no trailer, we close the stream
climba03003 marked this conversation as resolved.
Show resolved Hide resolved
res.end(null, null, null) // avoid ArgumentsAdaptorTrampoline from V8
return
}
const trailerHeaders = Object.keys(reply[kReplyTrailers])
const trailers = {}
let handled = 0
let skipped = true
function send () {
// add trailers when all handler handled
/* istanbul ignore else */
if (handled === 0) {
res.addTrailers(trailers)
// we need to properly close the stream
// after trailers sent
res.end(null, null, null) // avoid ArgumentsAdaptorTrampoline from V8
}
}

for (const trailerName of trailerHeaders) {
if (typeof reply[kReplyTrailers][trailerName] !== 'function') continue
skipped = false
handled--

function cb (err, value) {
Expand All @@ -689,11 +702,10 @@ function sendTrailer (payload, res, reply) {
if (err) reply.log.debug(err)
else trailers[trailerName] = value

// add trailers when all handler handled
/* istanbul ignore else */
if (handled === 0) {
res.addTrailers(trailers)
}
// we push the check to the end of event
// loop, so the registration continue to
// process.
process.nextTick(send)
}

const result = reply[kReplyTrailers][trailerName](reply, payload, cb)
Expand All @@ -705,6 +717,10 @@ function sendTrailer (payload, res, reply) {
cb(null, result)
}
}

// when all trailers are skipped
// we need to close the stream
if (skipped) res.end(null, null, null) // avoid ArgumentsAdaptorTrampoline from V8
}

function sendStreamTrailer (payload, res, reply) {
Expand Down
110 changes: 110 additions & 0 deletions test/reply-trailers.test.js
Expand Up @@ -5,6 +5,8 @@ const test = t.test
const Fastify = require('..')
const { Readable } = require('stream')
const { createHash } = require('crypto')
const { promisify } = require('util')
const sleep = promisify(setTimeout)

test('send trailers when payload is empty string', t => {
t.plan(5)
Expand Down Expand Up @@ -216,6 +218,82 @@ test('should emit deprecation warning when using direct return', t => {
})
})

test('trailer handler counter', t => {
t.plan(2)

const data = JSON.stringify({ hello: 'world' })
const hash = createHash('md5')
hash.update(data)
const md5 = hash.digest('hex')

t.test('callback with timeout', t => {
t.plan(9)
const fastify = Fastify()

fastify.get('/', function (request, reply) {
reply.trailer('Return-Early', function (reply, payload, done) {
t.equal(data, payload)
done(null, 'return')
})
reply.trailer('Content-MD5', function (reply, payload, done) {
t.equal(data, payload)
const hash = createHash('md5')
hash.update(payload)
setTimeout(() => {
done(null, hash.digest('hex'))
}, 500)
})
reply.send(data)
})

fastify.inject({
method: 'GET',
url: '/'
}, (error, res) => {
t.error(error)
t.equal(res.statusCode, 200)
t.equal(res.headers['transfer-encoding'], 'chunked')
t.equal(res.headers.trailer, 'return-early content-md5')
t.equal(res.trailers['return-early'], 'return')
t.equal(res.trailers['content-md5'], md5)
t.notHas(res.headers, 'content-length')
})
})

t.test('async-await', t => {
t.plan(9)
const fastify = Fastify()

fastify.get('/', function (request, reply) {
reply.trailer('Return-Early', async function (reply, payload) {
t.equal(data, payload)
return 'return'
})
reply.trailer('Content-MD5', async function (reply, payload) {
t.equal(data, payload)
const hash = createHash('md5')
hash.update(payload)
await sleep(500)
return hash.digest('hex')
})
reply.send(data)
})

fastify.inject({
method: 'GET',
url: '/'
}, (error, res) => {
t.error(error)
t.equal(res.statusCode, 200)
t.equal(res.headers['transfer-encoding'], 'chunked')
t.equal(res.headers.trailer, 'return-early content-md5')
t.equal(res.trailers['return-early'], 'return')
t.equal(res.trailers['content-md5'], md5)
t.notHas(res.headers, 'content-length')
})
})
})

test('removeTrailer', t => {
t.plan(6)

Expand Down Expand Up @@ -247,6 +325,38 @@ test('removeTrailer', t => {
})
})

test('remove all trailers', t => {
t.plan(6)

const fastify = Fastify()

fastify.get('/', function (request, reply) {
reply.trailer('ETag', function (reply, payload, done) {
t.fail('it should not called as this trailer is removed')
done(null, 'custom-etag')
})
reply.removeTrailer('ETag')
reply.trailer('Should-Not-Call', function (reply, payload, done) {
t.fail('it should not called as this trailer is removed')
done(null, 'should-not-call')
})
reply.removeTrailer('Should-Not-Call')
reply.send('')
})

fastify.inject({
method: 'GET',
url: '/'
}, (error, res) => {
t.error(error)
t.equal(res.statusCode, 200)
t.notOk(res.headers.trailer)
t.notOk(res.trailers.etag)
t.notOk(res.trailers['should-not-call'])
t.notHas(res.headers, 'content-length')
})
})

test('hasTrailer', t => {
t.plan(10)

Expand Down