Skip to content

Commit

Permalink
feat: replace into-stream to Readable.from (#290)
Browse files Browse the repository at this point in the history
* feat: replace into-stream to Readable.from

* test: regression test of issue 288

* fixup

* feat: support more types

* chore: fix deps

* fixup

* fixup

* fixup

* fixup

* refactor: update checking condition

* fixup

* chore: apply suggestion

Co-authored-by: Aras Abbasi <aras.abbasi@googlemail.com>
Signed-off-by: KaKa <23028015+climba03003@users.noreply.github.com>

* fixup

* fixup

---------

Signed-off-by: KaKa <23028015+climba03003@users.noreply.github.com>
Co-authored-by: Aras Abbasi <aras.abbasi@googlemail.com>
  • Loading branch information
climba03003 and Uzlopak committed Mar 29, 2024
1 parent 593e2d8 commit 848be9d
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 17 deletions.
10 changes: 5 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ const fp = require('fastify-plugin')
const encodingNegotiator = require('@fastify/accept-negotiator')
const pump = require('pump')
const mimedb = require('mime-db')
const intoStream = require('into-stream')
const peek = require('peek-stream')
const { Minipass } = require('minipass')
const pumpify = require('pumpify')
const { Readable } = require('readable-stream')

const { isStream, isGzip, isDeflate } = require('./lib/utils')
const { isStream, isGzip, isDeflate, intoAsyncIterator } = require('./lib/utils')

const InvalidRequestEncodingError = createError('FST_CP_ERR_INVALID_CONTENT_ENCODING', 'Unsupported Content-Encoding: %s', 415)
const InvalidRequestCompressedPayloadError = createError('FST_CP_ERR_INVALID_CONTENT', 'Could not decompress the request payload using the provided encoding', 400)
Expand Down Expand Up @@ -276,7 +276,7 @@ function buildRouteCompress (fastify, params, routeOptions, decorateOnly) {
if (Buffer.byteLength(payload) < params.threshold) {
return next()
}
payload = intoStream(payload)
payload = Readable.from(intoAsyncIterator(payload))
}

setVaryHeader(reply)
Expand Down Expand Up @@ -400,7 +400,7 @@ function compress (params) {
if (Buffer.byteLength(payload) < params.threshold) {
return this.send(payload)
}
payload = intoStream(payload)
payload = Readable.from(intoAsyncIterator(payload))
}

setVaryHeader(this)
Expand Down Expand Up @@ -509,7 +509,7 @@ function maybeUnzip (payload, serialize) {
// handle case where serialize doesn't return a string or Buffer
if (!Buffer.isBuffer(buf)) return result
if (isCompressed(buf) === 0) return result
return intoStream(result)
return Readable.from(intoAsyncIterator(result))
}

function zipStream (deflate, encoding) {
Expand Down
43 changes: 42 additions & 1 deletion lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,45 @@ function isStream (stream) {
return stream !== null && typeof stream === 'object' && typeof stream.pipe === 'function'
}

module.exports = { isGzip, isDeflate, isStream }
/**
* Provide a async iteratable for Readable.from
*/
async function * intoAsyncIterator (payload) {
if (typeof payload === 'object') {
if (Buffer.isBuffer(payload)) {
yield payload
return
}

if (
// ArrayBuffer
payload instanceof ArrayBuffer ||
// NodeJS.TypedArray
ArrayBuffer.isView(payload)
) {
yield Buffer.from(payload)
return
}

// Iterator
if (Symbol.iterator in payload) {
for (const chunk of payload) {
yield chunk
}
return
}

// Async Iterator
if (Symbol.asyncIterator in payload) {
for await (const chunk of payload) {
yield chunk
}
return
}
}

// string
yield payload
}

module.exports = { isGzip, isDeflate, isStream, intoAsyncIterator }
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
"dependencies": {
"@fastify/accept-negotiator": "^1.1.0",
"fastify-plugin": "^4.5.0",
"into-stream": "^6.0.0",
"mime-db": "^1.52.0",
"minipass": "^7.0.2",
"peek-stream": "^1.1.3",
"pump": "^3.0.0",
"pumpify": "^2.0.1"
"pumpify": "^2.0.1",
"readable-stream": "^4.5.2"
},
"devDependencies": {
"@fastify/pre-commit": "^2.0.2",
Expand All @@ -26,7 +26,8 @@
"standard": "^17.1.0",
"tap": "^16.3.7",
"tsd": "^0.30.0",
"typescript": "^5.1.6"
"typescript": "^5.1.6",
"undici": "^5.28.3"
},
"scripts": {
"coverage": "npm run test:unit -- --coverage-report=html",
Expand Down
48 changes: 48 additions & 0 deletions test/issue-288.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
'use strict'

const { test } = require('tap')
const Fastify = require('fastify')
const fastifyCompress = require('..')
const { request, setGlobalDispatcher, Agent } = require('undici')

setGlobalDispatcher(new Agent({
keepAliveTimeout: 10,
keepAliveMaxTimeout: 10
}))

test('should not corrupt the file content', async (t) => {
// provide 2 byte unicode content
const twoByteUnicodeContent = new Array(5_000)
.fill('0')
.map(() => {
const random = new Array(10).fill('A').join('🍃')
return random + '- FASTIFY COMPRESS,🍃 FASTIFY COMPRESS'
})
.join('\n')
const fastify = new Fastify()
t.teardown(() => fastify.close())

fastify.register(async (instance, opts) => {
await fastify.register(fastifyCompress)
// compression
instance.get('/issue', async (req, reply) => {
return twoByteUnicodeContent
})
})

// no compression
fastify.get('/good', async (req, reply) => {
return twoByteUnicodeContent
})

await fastify.listen({ port: 0 })

const { port } = fastify.server.address()
const url = `http://localhost:${port}`

const response = await request(`${url}/issue`)
const response2 = await request(`${url}/good`)
const body = await response.body.text()
const body2 = await response2.body.text()
t.equal(body, body2)
})
44 changes: 43 additions & 1 deletion test/utils.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { createReadStream } = require('node:fs')
const { Socket } = require('node:net')
const { Duplex, PassThrough, Readable, Stream, Transform, Writable } = require('node:stream')
const { test } = require('tap')
const { isStream, isDeflate, isGzip } = require('../lib/utils')
const { isStream, isDeflate, isGzip, intoAsyncIterator } = require('../lib/utils')

test('isStream() utility should be able to detect Streams', async (t) => {
t.plan(12)
Expand Down Expand Up @@ -61,3 +61,45 @@ test('isGzip() utility should be able to detect gzip compressed Buffer', async (
t.equal(isGzip(undefined), false)
t.equal(isGzip(''), false)
})

test('intoAsyncIterator() utility should handle different data', async (t) => {
t.plan(8)

const buf = Buffer.from('foo')
const str = 'foo'
const arr = [str, str]
const arrayBuffer = new ArrayBuffer(8)
const typedArray = new Int32Array(arrayBuffer)
const asyncIterator = (async function * () {
yield str
})()
const obj = {}

for await (const buffer of intoAsyncIterator(buf)) {
t.equal(buffer, buf)
}

for await (const string of intoAsyncIterator(str)) {
t.equal(string, str)
}

for await (const chunk of intoAsyncIterator(arr)) {
t.equal(chunk, str)
}

for await (const chunk of intoAsyncIterator(arrayBuffer)) {
t.equal(chunk.toString(), Buffer.from(arrayBuffer).toString())
}

for await (const chunk of intoAsyncIterator(typedArray)) {
t.equal(chunk.toString(), Buffer.from(typedArray).toString())
}

for await (const chunk of intoAsyncIterator(asyncIterator)) {
t.equal(chunk, str)
}

for await (const chunk of intoAsyncIterator(obj)) {
t.equal(chunk, obj)
}
})
21 changes: 14 additions & 7 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ import {
FastifyPluginCallback,
FastifyReply,
FastifyRequest,
RouteOptions as FastifyRouteOptions,
RawServerBase,
RawServerDefault,
RouteOptions as FastifyRouteOptions
} from 'fastify'
import { Input, InputObject } from 'into-stream'
import { Stream } from 'stream'
import { BrotliOptions, ZlibOptions } from 'zlib'
RawServerDefault
} from 'fastify';
import { Stream } from 'stream';
import { BrotliOptions, ZlibOptions } from 'zlib';

declare module 'fastify' {
export interface FastifyContextConfig {
Expand All @@ -26,7 +25,7 @@ declare module 'fastify' {
}

interface FastifyReply {
compress(input: Stream | Input | InputObject): void;
compress(input: Stream | Input): void;
}

export interface RouteOptions {
Expand Down Expand Up @@ -61,6 +60,14 @@ type EncodingToken = 'br' | 'deflate' | 'gzip' | 'identity';

type CompressibleContentTypeFunction = (contentType: string) => boolean;

type Input =
| Buffer
| NodeJS.TypedArray
| ArrayBuffer
| string
| Iterable<Buffer | string>
| AsyncIterable<Buffer | string>;

declare namespace fastifyCompress {

export interface FastifyCompressOptions {
Expand Down

0 comments on commit 848be9d

Please sign in to comment.