Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(WIP): support stream trailer #4373

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 16 additions & 15 deletions lib/reply.js
@@ -1,6 +1,7 @@
'use strict'

const eos = require('stream').finished
const Cloneable = require('cloneable-readable')

const {
kFourOhFourContext,
Expand Down Expand Up @@ -618,8 +619,8 @@ function sendStream (payload, res, reply) {
let sourceOpen = true
let errorLogged = false

// set trailer when stream ended
sendStreamTrailer(payload, res, reply)
// clone stream for trailer
if (reply[kReplyTrailers] !== null) payload = Cloneable(payload)

eos(payload, { readable: true, writable: false }, function (err) {
sourceOpen = false
Expand Down Expand Up @@ -666,18 +667,21 @@ function sendStream (payload, res, reply) {
} else {
reply.log.warn('response will send, but you shouldn\'t use res.writeHead in stream mode')
}
payload.pipe(res)

payload.pipe(res, { end: reply[kReplyTrailers] === null })
climba03003 marked this conversation as resolved.
Show resolved Hide resolved
// stream will start flowing when the trailer handler attached
// all the cloned stream
sendTrailer(payload, res, reply)
}

function sendTrailer (payload, res, reply) {
if (reply[kReplyTrailers] === null) return
const trailerHeaders = Object.keys(reply[kReplyTrailers])
const trailers = {}
let handled = 0
for (const trailerName of trailerHeaders) {
if (typeof reply[kReplyTrailers][trailerName] !== 'function') continue
handled--

const _trailers = Object.entries(reply[kReplyTrailers]).filter(([k, v]) => typeof v === 'function')
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there good choice without using .filter?
The previous implementation using a single variables will count wrong and have racing condition.

cc @Uzlopak maybe you will have some idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can implement the same with a for loop


for (const [name, func] of _trailers) {
function cb (err, value) {
// TODO: we may protect multiple callback calls
// or mixing async-await with callback
Expand All @@ -687,16 +691,18 @@ function sendTrailer (payload, res, reply) {
// since it does affect the client
// we log in here only for debug usage
if (err) reply.log.debug(err)
else trailers[trailerName] = value
else trailers[name] = value

// add trailers when all handler handled
/* istanbul ignore else */
if (handled === 0) {
if (_trailers.length === handled) {
res.addTrailers(trailers)
// end the stream properly
res.end(null, null, null)
}
}

const result = reply[kReplyTrailers][trailerName](reply, payload, cb)
const result = func(reply, payload instanceof Cloneable ? payload.clone() : payload, cb)
if (typeof result === 'object' && typeof result.then === 'function') {
result.then((v) => cb(null, v), cb)
} else if (result !== null && result !== undefined) {
Expand All @@ -707,11 +713,6 @@ function sendTrailer (payload, res, reply) {
}
}

function sendStreamTrailer (payload, res, reply) {
if (reply[kReplyTrailers] === null) return
payload.on('end', () => sendTrailer(null, res, reply))
}

function onErrorHook (reply, error, cb) {
if (reply[kRouteContext].onError !== null && !reply[kReplyNextErrorHandler]) {
reply[kReplyIsRunningOnErrorHook] = true
Expand Down
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -176,6 +176,7 @@
"@fastify/fast-json-stringify-compiler": "^4.1.0",
"abstract-logging": "^2.0.1",
"avvio": "^8.2.0",
"cloneable-readable": "^3.0.0",
"find-my-way": "^7.3.0",
"light-my-request": "^5.6.1",
"pino": "^8.5.0",
Expand Down
47 changes: 45 additions & 2 deletions test/reply-trailers.test.js
Expand Up @@ -110,14 +110,14 @@ test('send trailers when payload is json', t => {
})
})

test('send trailers when payload is stream', t => {
t.test('send trailers when payload is stream - single', t => {
t.plan(7)

const fastify = Fastify()

fastify.get('/', function (request, reply) {
reply.trailer('ETag', function (reply, payload, done) {
t.same(payload, null)
t.same(typeof payload.pipe === 'function', true)
done(null, 'custom-etag')
})
const stream = Readable.from([JSON.stringify({ hello: 'world' })])
Expand All @@ -137,6 +137,49 @@ test('send trailers when payload is stream', t => {
})
})

t.test('send trailers when payload is stream - multiple', t => {
t.plan(9)

const fastify = Fastify()
const data = JSON.stringify({ hello: 'world' })
const hash = createHash('md5')
hash.update(data)
const md5 = hash.digest('hex')

fastify.get('/', function (request, reply) {
reply.trailer('ETag', function (reply, payload, done) {
t.same(typeof payload.pipe === 'function', true)
payload.on('end', () => {
done(null, 'custom-etag')
})
})
reply.trailer('Content-MD5', function (reply, payload, done) {
t.same(typeof payload.pipe === 'function', true)
const hash = createHash('md5')
payload.pipe(hash)
payload.on('end', () => {
hash.end()
done(null, hash.read().toString('hex'))
})
})
const stream = Readable.from([data])
reply.send(stream)
})

fastify.inject({
method: 'GET',
url: '/'
}, (error, res) => {
t.error(error)
t.equal(res.statusCode, 200)
t.equal(res.headers['transfer-encoding'], 'chunked')
t.equal(res.headers.trailer, 'etag content-md5')
t.equal(res.trailers.etag, 'custom-etag')
t.equal(res.trailers['content-md5'], md5)
t.notHas(res.headers, 'content-length')
})
})

test('send trailers when using async-await', t => {
t.plan(5)

Expand Down