diff --git a/docs/docs/api/Dispatcher.md b/docs/docs/api/Dispatcher.md index e307b247ee3..ecc3cfd61f7 100644 --- a/docs/docs/api/Dispatcher.md +++ b/docs/docs/api/Dispatcher.md @@ -952,6 +952,38 @@ const client = new Client("http://example.com").compose( ); ``` +##### `dump` + +The `dump` interceptor enables you to dump the response body from a request upon a given limit. + +**Options** +- `maxSize` - The maximum size (in bytes) of the response body to dump. If the size of the request's body exceeds this value then the connection will be closed. Default: `1048576`. + +> The `Dispatcher#options` also gets extended with the options `dumpMaxSize`, `abortOnDumped`, and `waitForTrailers` which can be used to configure the interceptor at a request-per-request basis. + +**Example - Basic Dump Interceptor** + +```js +const { Client, interceptors } = require("undici"); +const { dump } = interceptors; + +const client = new Client("http://example.com").compose( + dump({ + maxSize: 1024, + }) +); + +// or +client.dispatch( + { + path: "/", + method: "GET", + dumpMaxSize: 1024, + }, + handler +); +``` + ## Instance Events ### Event: `'connect'` diff --git a/index.js b/index.js index dd5ada84bc6..7a68d04abb3 100644 --- a/index.js +++ b/index.js @@ -40,7 +40,8 @@ module.exports.RedirectHandler = RedirectHandler module.exports.createRedirectInterceptor = createRedirectInterceptor module.exports.interceptors = { redirect: require('./lib/interceptor/redirect'), - retry: require('./lib/interceptor/retry') + retry: require('./lib/interceptor/retry'), + dump: require('./lib/interceptor/dump') } module.exports.buildConnector = buildConnector diff --git a/lib/interceptor/dump.js b/lib/interceptor/dump.js new file mode 100644 index 00000000000..fc9cacb198d --- /dev/null +++ b/lib/interceptor/dump.js @@ -0,0 +1,123 @@ +'use strict' + +const util = require('../core/util') +const { InvalidArgumentError, RequestAbortedError } = require('../core/errors') +const DecoratorHandler = require('../handler/decorator-handler') + +class DumpHandler extends DecoratorHandler { + #maxSize = 1024 * 1024 + #abort = null + #dumped = false + #aborted = false + #size = 0 + #reason = null + #handler = null + + constructor ({ maxSize }, handler) { + super(handler) + + if (maxSize != null && (!Number.isFinite(maxSize) || maxSize < 1)) { + throw new InvalidArgumentError('maxSize must be a number greater than 0') + } + + this.#maxSize = maxSize ?? this.#maxSize + this.#handler = handler + } + + onConnect (abort) { + this.#abort = abort + + this.#handler.onConnect(this.#customAbort.bind(this)) + } + + #customAbort (reason) { + this.#aborted = true + this.#reason = reason + } + + // TODO: will require adjustment after new hooks are out + onHeaders (statusCode, rawHeaders, resume, statusMessage) { + const headers = util.parseHeaders(rawHeaders) + const contentLength = headers['content-length'] + + if (contentLength != null && contentLength > this.#maxSize) { + throw new RequestAbortedError( + `Response size (${contentLength}) larger than maxSize (${ + this.#maxSize + })` + ) + } + + if (this.#aborted) { + return true + } + + return this.#handler.onHeaders( + statusCode, + rawHeaders, + resume, + statusMessage + ) + } + + onError (err) { + if (this.#dumped) { + return + } + + err = this.#reason ?? err + + this.#handler.onError(err) + } + + onData (chunk) { + this.#size = this.#size + chunk.length + + if (this.#size >= this.#maxSize) { + this.#dumped = true + + if (this.#aborted) { + this.#handler.onError(this.#reason) + } else { + this.#handler.onComplete([]) + } + } + + return true + } + + onComplete (trailers) { + if (this.#dumped) { + return + } + + if (this.#aborted) { + this.#handler.onError(this.reason) + return + } + + this.#handler.onComplete(trailers) + } +} + +function createDumpInterceptor ( + { maxSize: defaultMaxSize } = { + maxSize: 1024 * 1024 + } +) { + return dispatch => { + return function Intercept (opts, handler) { + const { dumpMaxSize = defaultMaxSize } = + opts + + const dumpHandler = new DumpHandler( + { maxSize: dumpMaxSize }, + handler + ) + + return dispatch(opts, dumpHandler) + } + } +} + +module.exports = createDumpInterceptor diff --git a/test/interceptors/dump-interceptor.js b/test/interceptors/dump-interceptor.js new file mode 100644 index 00000000000..ba6dbe83c02 --- /dev/null +++ b/test/interceptors/dump-interceptor.js @@ -0,0 +1,525 @@ +'use strict' +const { platform } = require('node:os') +const { test, after } = require('node:test') +const { createServer } = require('node:http') +const { once } = require('node:events') +const { tspl } = require('@matteo.collina/tspl') + +const { Client, interceptors } = require('../..') +const { dump } = interceptors + +if (platform() === 'win32') { + // TODO: Fix tests on windows + console.log('Skipping test on Windows') + process.exit(0) +} + +test('Should dump on abort', async t => { + t = tspl(t, { plan: 2 }) + let offset = 0 + const server = createServer((req, res) => { + const max = 1024 * 1024 + const buffer = Buffer.alloc(max) + + res.writeHead(200, { + 'Content-Type': 'application/octet-stream' + }) + + const interval = setInterval(() => { + offset += 256 + const chunk = buffer.subarray(offset - 256, offset) + + if (offset === max) { + clearInterval(interval) + res.end(chunk) + return + } + + res.write(chunk) + }, 0) + }) + + const abc = new AbortController() + + const requestOptions = { + method: 'GET', + path: '/', + signal: abc.signal + } + + server.listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(dump({ maxSize: 512 })) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + const response = await client.request(requestOptions) + + abc.abort() + + try { + await response.body.text() + } catch (error) { + t.equal(response.statusCode, 200) + t.equal(error.name, 'AbortError') + } + + await t.completed +}) + +test('Should dump on already aborted request', async t => { + t = tspl(t, { plan: 3 }) + let offset = 0 + const server = createServer((req, res) => { + const max = 1024 + const buffer = Buffer.alloc(max) + + res.writeHead(200, { + 'Content-Type': 'application/octet-stream' + }) + + res.once('close', () => { + t.equal(offset, 1024) + }) + + const interval = setInterval(() => { + offset += 256 + const chunk = buffer.subarray(offset - 256, offset) + + if (offset === max) { + clearInterval(interval) + res.end(chunk) + return + } + + res.write(chunk) + }, 0) + }) + + const abc = new AbortController() + + const requestOptions = { + method: 'GET', + path: '/', + signal: abc.signal + } + + server.listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(dump({ maxSize: 512 })) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + abc.abort() + client.request(requestOptions).catch(err => { + t.equal(err.name, 'AbortError') + t.equal(err.message, 'This operation was aborted') + }) + + await t.completed +}) + +test('Should dump response body up to limit (default)', async t => { + t = tspl(t, { plan: 3 }) + const server = createServer((req, res) => { + const buffer = Buffer.alloc(1024 * 1024) + res.writeHead(200, { + 'Content-Length': buffer.length, + 'Content-Type': 'application/octet-stream' + }) + + res.end(buffer) + }) + + const requestOptions = { + method: 'GET', + path: '/' + } + + server.listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(dump()) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + const response = await client.request(requestOptions) + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-length'], `${1024 * 1024}`) + t.equal(body, '') + + await t.completed +}) + +test('Should dump response body up to limit and ignore trailers', async t => { + t = tspl(t, { plan: 3 }) + const server = createServer((req, res) => { + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Transfer-Encoding': 'chunked', + Trailer: 'X-Foo' + }) + + res.write(Buffer.alloc(1024 * 1024).toString('utf-8')) + res.addTrailers({ 'X-Foo': 'bar' }) + res.end() + }) + + const requestOptions = { + method: 'GET', + path: '/' + } + + server.listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(dump()) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + const response = await client.request(requestOptions) + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(body, '') + t.equal(response.trailers['x-foo'], undefined) + + await t.completed +}) + +test('Should forward common error', async t => { + t = tspl(t, { plan: 1 }) + const server = createServer((req, res) => { + res.destroy() + }) + + const requestOptions = { + method: 'GET', + path: '/' + } + + server.listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(dump()) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + await t.rejects(client.request.bind(client, requestOptions), { + name: 'SocketError', + code: 'UND_ERR_SOCKET', + message: 'other side closed' + }) + + await t.completed +}) + +test('Should throw on bad opts', async t => { + t = tspl(t, { plan: 6 }) + + t.throws( + () => { + new Client('http://localhost').compose(dump({ maxSize: {} })).dispatch( + { + method: 'GET', + path: '/' + }, + {} + ) + }, + { + name: 'InvalidArgumentError', + message: 'maxSize must be a number greater than 0' + } + ) + t.throws( + () => { + new Client('http://localhost').compose(dump({ maxSize: '0' })).dispatch( + { + method: 'GET', + path: '/' + }, + {} + ) + }, + { + name: 'InvalidArgumentError', + message: 'maxSize must be a number greater than 0' + } + ) + t.throws( + () => { + new Client('http://localhost').compose(dump({ maxSize: -1 })).dispatch( + { + method: 'GET', + path: '/' + }, + {} + ) + }, + { + name: 'InvalidArgumentError', + message: 'maxSize must be a number greater than 0' + } + ) + t.throws( + () => { + new Client('http://localhost').compose(dump()).dispatch( + { + method: 'GET', + path: '/', + dumpMaxSize: {} + }, + {} + ) + }, + { + name: 'InvalidArgumentError', + message: 'maxSize must be a number greater than 0' + } + ) + t.throws( + () => { + new Client('http://localhost').compose(dump()).dispatch( + { + method: 'GET', + path: '/', + dumpMaxSize: '0' + }, + {} + ) + }, + { + name: 'InvalidArgumentError', + message: 'maxSize must be a number greater than 0' + } + ) + t.throws( + () => { + new Client('http://localhost').compose(dump()).dispatch( + { + method: 'GET', + path: '/', + dumpMaxSize: -1 + }, + {} + ) + }, + { + name: 'InvalidArgumentError', + message: 'maxSize must be a number greater than 0' + } + ) +}) + +test('Should dump response body up to limit (opts)', async t => { + t = tspl(t, { plan: 3 }) + const server = createServer((req, res) => { + const buffer = Buffer.alloc(1 * 1024) + res.writeHead(200, { + 'Content-Length': buffer.length, + 'Content-Type': 'application/octet-stream' + }) + res.end(buffer) + }) + + const requestOptions = { + method: 'GET', + path: '/' + } + + server.listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(dump({ maxSize: 1 * 1024 })) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + const response = await client.request(requestOptions) + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-length'], `${1 * 1024}`) + t.equal(body, '') + + await t.completed +}) + +test('Should abort if content length grater than max size', async t => { + t = tspl(t, { plan: 1 }) + const server = createServer((req, res) => { + const buffer = Buffer.alloc(2 * 1024) + res.writeHead(200, { + 'Content-Length': buffer.length, + 'Content-Type': 'application/octet-stream' + }) + res.end(buffer) + }) + + const requestOptions = { + method: 'GET', + path: '/' + } + + server.listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(dump({ maxSize: 1 * 1024, abortOnDumped: false })) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + t.rejects(client.request(requestOptions), { + name: 'AbortError', + message: 'Response size (2048) larger than maxSize (1024)' + }) + + await t.completed +}) + +test('Should dump response body up to limit (dispatch opts)', async t => { + t = tspl(t, { plan: 3 }) + const server = createServer((req, res) => { + const buffer = Buffer.alloc(1 * 1024) + res.writeHead(200, { + 'Content-Length': buffer.length, + 'Content-Type': 'application/octet-stream' + }) + res.end(buffer) + }) + + const requestOptions = { + method: 'GET', + path: '/', + dumpMaxSize: 1 * 1024, + abortOnDumped: false + } + + server.listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(dump()) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + const response = await client.request(requestOptions) + const body = await response.body.text() + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-length'], `${1 * 1024}`) + t.equal(body, '') + + await t.completed +}) + +test('Should abort if content length grater than max size (dispatch opts)', async t => { + t = tspl(t, { plan: 1 }) + const server = createServer((req, res) => { + const buffer = Buffer.alloc(2 * 1024) + res.writeHead(200, { + 'Content-Length': buffer.length, + 'Content-Type': 'application/octet-stream' + }) + res.end(buffer) + }) + + const requestOptions = { + method: 'GET', + path: '/', + dumpMaxSize: 100 + } + + server.listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(dump()) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + await t.rejects( + async () => { + return await client.request(requestOptions).then(res => res.body.text()) + }, + { + name: 'AbortError', + message: 'Response size (2048) larger than maxSize (100)' + } + ) + + await t.completed +}) diff --git a/test/types/index.test-d.ts b/test/types/index.test-d.ts index 5e030b34cac..39e413c02c8 100644 --- a/test/types/index.test-d.ts +++ b/test/types/index.test-d.ts @@ -13,6 +13,9 @@ expectAssignable(Undici.Request) expectAssignable(Undici.FormData) expectAssignable(Undici.File) expectAssignable(Undici.FileReader) +expectAssignable(Undici.interceptors.dump()) +expectAssignable(Undici.interceptors.redirect()) +expectAssignable(Undici.interceptors.retry()) const client = new Undici.Client('', {}) const handler: Dispatcher.DispatchHandlers = {} diff --git a/types/dispatcher.d.ts b/types/dispatcher.d.ts index c472c8bec05..0aa2aba00e3 100644 --- a/types/dispatcher.d.ts +++ b/types/dispatcher.d.ts @@ -19,8 +19,8 @@ declare class Dispatcher extends EventEmitter { connect(options: Dispatcher.ConnectOptions): Promise; connect(options: Dispatcher.ConnectOptions, callback: (err: Error | null, data: Dispatcher.ConnectData) => void): void; /** Compose a chain of dispatchers */ - compose(dispatchers: Dispatcher.DispatcherInterceptor[]): Dispatcher.ComposedDispatcher; - compose(...dispatchers: Dispatcher.DispatcherInterceptor[]): Dispatcher.ComposedDispatcher; + compose(dispatchers: Dispatcher.DispatcherComposeInterceptor[]): Dispatcher.ComposedDispatcher; + compose(...dispatchers: Dispatcher.DispatcherComposeInterceptor[]): Dispatcher.ComposedDispatcher; /** Performs an HTTP request. */ request(options: Dispatcher.RequestOptions): Promise; request(options: Dispatcher.RequestOptions, callback: (err: Error | null, data: Dispatcher.ResponseData) => void): void; @@ -97,7 +97,7 @@ declare class Dispatcher extends EventEmitter { declare namespace Dispatcher { export interface ComposedDispatcher extends Dispatcher {} - export type DispatcherInterceptor = (dispatch: Dispatcher['dispatch']) => Dispatcher['dispatch']; + export type DispatcherComposeInterceptor = (dispatch: Dispatcher['dispatch']) => Dispatcher['dispatch']; export interface DispatchOptions { origin?: string | URL; path: string; diff --git a/types/index.d.ts b/types/index.d.ts index 08f0684d96d..9e5eaeb3d54 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -66,4 +66,9 @@ declare namespace Undici { var File: typeof import('./file').File; var FileReader: typeof import('./filereader').FileReader; var caches: typeof import('./cache').caches; + var interceptors: { + dump: typeof import('./interceptors').dump; + retry: typeof import('./interceptors').retry; + redirect: typeof import('./interceptors').redirect; + } } diff --git a/types/interceptors.d.ts b/types/interceptors.d.ts index 047ac175d50..d546ac344e3 100644 --- a/types/interceptors.d.ts +++ b/types/interceptors.d.ts @@ -1,5 +1,11 @@ import Dispatcher from "./dispatcher"; +import RetryHandler from "./retry-handler"; -type RedirectInterceptorOpts = { maxRedirections?: number } +export type DumpInterceptorOpts = { maxSize?: number } +export type RetryInterceptorOpts = RetryHandler.RetryOptions +export type RedirectInterceptorOpts = { maxRedirections?: number } -export declare function createRedirectInterceptor (opts: RedirectInterceptorOpts): Dispatcher.DispatchInterceptor +export declare function createRedirectInterceptor (opts: RedirectInterceptorOpts): Dispatcher.DispatcherComposeInterceptor +export declare function dump(opts?: DumpInterceptorOpts): Dispatcher.DispatcherComposeInterceptor +export declare function retry(opts?: RetryInterceptorOpts): Dispatcher.DispatcherComposeInterceptor +export declare function redirect(opts?: RedirectInterceptorOpts): Dispatcher.DispatcherComposeInterceptor