Skip to content

Commit

Permalink
feat: dispatch compose
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Feb 26, 2024
1 parent 79eb238 commit 00118ad
Show file tree
Hide file tree
Showing 35 changed files with 1,787 additions and 1,976 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -79,3 +79,5 @@ fuzz-results-*.json
# Bundle output
undici-fetch.js
/test/imports/undici-import.js

.tap
2 changes: 0 additions & 2 deletions docs/docs/api/Agent.md
Expand Up @@ -19,8 +19,6 @@ Returns: `Agent`
Extends: [`PoolOptions`](Pool.md#parameter-pooloptions)

* **factory** `(origin: URL, opts: Object) => Dispatcher` - Default: `(origin, opts) => new Pool(origin, opts)`
* **maxRedirections** `Integer` - Default: `0`. The number of HTTP redirection to follow unless otherwise specified in `DispatchOptions`.
* **interceptors** `{ Agent: DispatchInterceptor[] }` - Default: `[RedirectInterceptor]` - A list of interceptors that are applied to the dispatch method. Additional logic can be applied (such as, but not limited to: 302 status code handling, authentication, cookies, compression and caching). Note that the behavior of interceptors is Experimental and might change at any given time.

## Instance Properties

Expand Down
1 change: 0 additions & 1 deletion docs/docs/api/Client.md
Expand Up @@ -29,7 +29,6 @@ Returns: `Client`
* **pipelining** `number | null` (optional) - Default: `1` - The amount of concurrent requests to be sent over the single TCP/TLS connection according to [RFC7230](https://tools.ietf.org/html/rfc7230#section-6.3.2). Carefully consider your workload and environment before enabling concurrent requests as pipelining may reduce performance if used incorrectly. Pipelining is sensitive to network stack settings as well as head of line blocking caused by e.g. long running requests. Set to `0` to disable keep-alive connections.
* **connect** `ConnectOptions | Function | null` (optional) - Default: `null`.
* **strictContentLength** `Boolean` (optional) - Default: `true` - Whether to treat request content length mismatches as errors. If true, an error is thrown when the request content-length header doesn't match the length of the request body.
* **interceptors** `{ Client: DispatchInterceptor[] }` - Default: `[RedirectInterceptor]` - A list of interceptors that are applied to the dispatch method. Additional logic can be applied (such as, but not limited to: 302 status code handling, authentication, cookies, compression and caching). Note that the behavior of interceptors is Experimental and might change at any given time.
* **autoSelectFamily**: `boolean` (optional) - Default: depends on local Node version, on Node 18.13.0 and above is `false`. Enables a family autodetection algorithm that loosely implements section 5 of [RFC 8305](https://tools.ietf.org/html/rfc8305#section-5). See [here](https://nodejs.org/api/net.html#socketconnectoptions-connectlistener) for more details. This option is ignored if not supported by the current Node version.
* **autoSelectFamilyAttemptTimeout**: `number` - Default: depends on local Node version, on Node 18.13.0 and above is `250`. The amount of time in milliseconds to wait for a connection attempt to finish before trying the next address when using the `autoSelectFamily` option. See [here](https://nodejs.org/api/net.html#socketconnectoptions-connectlistener) for more details.
* **allowH2**: `boolean` - Default: `false`. Enables support for H2 if the server has assigned bigger priority to it through ALPN negotiation.
Expand Down
60 changes: 0 additions & 60 deletions docs/docs/api/DispatchInterceptor.md

This file was deleted.

17 changes: 5 additions & 12 deletions index.js
Expand Up @@ -5,8 +5,6 @@ const Dispatcher = require('./lib/dispatcher/dispatcher')
const Pool = require('./lib/dispatcher/pool')
const BalancedPool = require('./lib/dispatcher/balanced-pool')
const Agent = require('./lib/dispatcher/agent')
const ProxyAgent = require('./lib/dispatcher/proxy-agent')
const RetryAgent = require('./lib/dispatcher/retry-agent')
const errors = require('./lib/core/errors')
const util = require('./lib/core/util')
const { InvalidArgumentError } = errors
Expand All @@ -16,11 +14,7 @@ const MockClient = require('./lib/mock/mock-client')
const MockAgent = require('./lib/mock/mock-agent')
const MockPool = require('./lib/mock/mock-pool')
const mockErrors = require('./lib/mock/mock-errors')
const RetryHandler = require('./lib/handler/RetryHandler')
const { getGlobalDispatcher, setGlobalDispatcher } = require('./lib/global')
const DecoratorHandler = require('./lib/handler/DecoratorHandler')
const RedirectHandler = require('./lib/handler/RedirectHandler')
const createRedirectInterceptor = require('./lib/interceptor/redirectInterceptor')

Object.assign(Dispatcher.prototype, api)

Expand All @@ -29,13 +23,12 @@ module.exports.Client = Client
module.exports.Pool = Pool
module.exports.BalancedPool = BalancedPool
module.exports.Agent = Agent
module.exports.ProxyAgent = ProxyAgent
module.exports.RetryAgent = RetryAgent
module.exports.RetryHandler = RetryHandler

module.exports.DecoratorHandler = DecoratorHandler
module.exports.RedirectHandler = RedirectHandler
module.exports.createRedirectInterceptor = createRedirectInterceptor
module.exports.interceptor = {
redirect: require('./lib/interceptor/redirect'),
retry: require('./lib/interceptor/retry'),
proxy: require('./lib/interceptor/proxy')
}

module.exports.buildConnector = buildConnector
module.exports.errors = errors
Expand Down
6 changes: 4 additions & 2 deletions lib/api/api-connect.js
Expand Up @@ -3,6 +3,7 @@
const { AsyncResource } = require('node:async_hooks')
const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors')
const util = require('../core/util')
const redirect = require('../interceptor/redirect')
const { addSignal, removeSignal } = require('./abort-signal')

class ConnectHandler extends AsyncResource {
Expand Down Expand Up @@ -90,8 +91,9 @@ function connect (opts, callback) {
}

try {
const connectHandler = new ConnectHandler(opts, callback)
this.dispatch({ ...opts, method: 'CONNECT' }, connectHandler)
this
.compose(redirect(opts))
.dispatch({ ...opts, method: opts?.method || 'CONNECT' }, new ConnectHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
7 changes: 6 additions & 1 deletion lib/api/api-pipeline.js
Expand Up @@ -12,6 +12,7 @@ const {
} = require('../core/errors')
const util = require('../core/util')
const { AsyncResource } = require('node:async_hooks')
const redirect = require('../interceptor/redirect')
const { addSignal, removeSignal } = require('./abort-signal')
const assert = require('node:assert')

Expand Down Expand Up @@ -239,7 +240,11 @@ class PipelineHandler extends AsyncResource {
function pipeline (opts, handler) {
try {
const pipelineHandler = new PipelineHandler(opts, handler)
this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)

this
.compose(redirect(opts))
.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)

return pipelineHandler.ret
} catch (err) {
return new PassThrough().destroy(err)
Expand Down
5 changes: 4 additions & 1 deletion lib/api/api-request.js
Expand Up @@ -6,6 +6,7 @@ const {
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const redirect = require('../interceptor/redirect')
const { getResolveErrorBodyCallback } = require('./util')
const { AsyncResource } = require('node:async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')
Expand Down Expand Up @@ -166,7 +167,9 @@ function request (opts, callback) {
}

try {
this.dispatch(opts, new RequestHandler(opts, callback))
this
.compose(redirect(opts))
.dispatch(opts, new RequestHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
5 changes: 4 additions & 1 deletion lib/api/api-stream.js
Expand Up @@ -7,6 +7,7 @@ const {
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const redirect = require('../interceptor/redirect')
const { getResolveErrorBodyCallback } = require('./util')
const { AsyncResource } = require('node:async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')
Expand Down Expand Up @@ -207,7 +208,9 @@ function stream (opts, factory, callback) {
}

try {
this.dispatch(opts, new StreamHandler(opts, factory, callback))
this
.compose(redirect(opts))
.dispatch(opts, new StreamHandler(opts, factory, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
10 changes: 4 additions & 6 deletions lib/api/api-upgrade.js
Expand Up @@ -3,6 +3,7 @@
const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors')
const { AsyncResource } = require('node:async_hooks')
const util = require('../core/util')
const redirect = require('../interceptor/redirect')
const { addSignal, removeSignal } = require('./abort-signal')
const assert = require('node:assert')

Expand Down Expand Up @@ -87,12 +88,9 @@ function upgrade (opts, callback) {
}

try {
const upgradeHandler = new UpgradeHandler(opts, callback)
this.dispatch({
...opts,
method: opts.method || 'GET',
upgrade: opts.protocol || 'Websocket'
}, upgradeHandler)
this
.compose(redirect(opts))
.dispatch({ ...opts, method: opts?.method || 'GET', upgrade: opts?.protocol || 'Websocket' }, new UpgradeHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
1 change: 0 additions & 1 deletion lib/core/symbols.js
Expand Up @@ -50,7 +50,6 @@ module.exports = {
kMaxRequests: Symbol('maxRequestsPerClient'),
kProxy: Symbol('proxy agent options'),
kCounter: Symbol('socket request counter'),
kInterceptors: Symbol('dispatch interceptors'),
kMaxResponseSize: Symbol('max response size'),
kHTTP2Session: Symbol('http2Session'),
kHTTP2SessionState: Symbol('http2Session state'),
Expand Down
10 changes: 1 addition & 9 deletions lib/dispatcher/agent.js
@@ -1,12 +1,11 @@
'use strict'

const { InvalidArgumentError } = require('../core/errors')
const { kClients, kRunning, kClose, kDestroy, kDispatch, kInterceptors } = require('../core/symbols')
const { kClients, kRunning, kClose, kDestroy, kDispatch } = require('../core/symbols')
const DispatcherBase = require('./dispatcher-base')
const Pool = require('./pool')
const Client = require('./client')
const util = require('../core/util')
const createRedirectInterceptor = require('../interceptor/redirectInterceptor')

const kOnConnect = Symbol('onConnect')
const kOnDisconnect = Symbol('onDisconnect')
Expand Down Expand Up @@ -42,14 +41,7 @@ class Agent extends DispatcherBase {
connect = { ...connect }
}

this[kInterceptors] = options.interceptors?.Agent && Array.isArray(options.interceptors.Agent)
? options.interceptors.Agent
: [createRedirectInterceptor({ maxRedirections })]

this[kOptions] = { ...util.deepClone(options), connect }
this[kOptions].interceptors = options.interceptors
? { ...options.interceptors }
: undefined
this[kMaxRedirections] = maxRedirections
this[kFactory] = factory
this[kClients] = new Map()
Expand Down
5 changes: 1 addition & 4 deletions lib/dispatcher/balanced-pool.js
Expand Up @@ -13,7 +13,7 @@ const {
kGetDispatcher
} = require('./pool-base')
const Pool = require('./pool')
const { kUrl, kInterceptors } = require('../core/symbols')
const { kUrl } = require('../core/symbols')
const { parseOrigin } = require('../core/util')
const kFactory = Symbol('factory')

Expand Down Expand Up @@ -53,9 +53,6 @@ class BalancedPool extends PoolBase {
throw new InvalidArgumentError('factory must be a function.')
}

this[kInterceptors] = opts.interceptors?.BalancedPool && Array.isArray(opts.interceptors.BalancedPool)
? opts.interceptors.BalancedPool
: []
this[kFactory] = factory

for (const upstream of upstreams) {
Expand Down
10 changes: 0 additions & 10 deletions lib/dispatcher/client.js
Expand Up @@ -68,7 +68,6 @@ const {
kClose,
kDestroy,
kDispatch,
kInterceptors,
kLocalAddress,
kMaxResponseSize,
kHTTPConnVersion,
Expand Down Expand Up @@ -136,7 +135,6 @@ class Client extends DispatcherBase {
* @param {import('../../types/client.js').Client.Options} options
*/
constructor (url, {
interceptors,
maxHeaderSize,
headersTimeout,
socketTimeout,
Expand Down Expand Up @@ -223,10 +221,6 @@ class Client extends DispatcherBase {
throw new InvalidArgumentError('connect must be a function or an object')
}

if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)) {
throw new InvalidArgumentError('maxRequestsPerClient must be a positive number')
}
Expand Down Expand Up @@ -267,9 +261,6 @@ class Client extends DispatcherBase {
})
}

this[kInterceptors] = interceptors?.Client && Array.isArray(interceptors.Client)
? interceptors.Client
: [createRedirectInterceptor({ maxRedirections })]
this[kUrl] = util.parseOrigin(url)
this[kConnector] = connect
this[kSocket] = null
Expand Down Expand Up @@ -488,7 +479,6 @@ function onHTTP2GoAway (code) {
}

const constants = require('../llhttp/constants.js')
const createRedirectInterceptor = require('../interceptor/redirectInterceptor.js')
const EMPTY_BUF = Buffer.alloc(0)

async function lazyllhttp () {
Expand Down
36 changes: 2 additions & 34 deletions lib/dispatcher/dispatcher-base.js
Expand Up @@ -6,13 +6,12 @@ const {
ClientClosedError,
InvalidArgumentError
} = require('../core/errors')
const { kDestroy, kClose, kDispatch, kInterceptors } = require('../core/symbols')
const { kDestroy, kClose, kDispatch } = require('../core/symbols')

const kDestroyed = Symbol('destroyed')
const kClosed = Symbol('closed')
const kOnDestroyed = Symbol('onDestroyed')
const kOnClosed = Symbol('onClosed')
const kInterceptedDispatch = Symbol('Intercepted Dispatch')

class DispatcherBase extends Dispatcher {
constructor () {
Expand All @@ -32,23 +31,6 @@ class DispatcherBase extends Dispatcher {
return this[kClosed]
}

get interceptors () {
return this[kInterceptors]
}

set interceptors (newInterceptors) {
if (newInterceptors) {
for (let i = newInterceptors.length - 1; i >= 0; i--) {
const interceptor = this[kInterceptors][i]
if (typeof interceptor !== 'function') {
throw new InvalidArgumentError('interceptor must be an function')
}
}
}

this[kInterceptors] = newInterceptors
}

close (callback) {
if (callback === undefined) {
return new Promise((resolve, reject) => {
Expand Down Expand Up @@ -144,20 +126,6 @@ class DispatcherBase extends Dispatcher {
})
}

[kInterceptedDispatch] (opts, handler) {
if (!this[kInterceptors] || this[kInterceptors].length === 0) {
this[kInterceptedDispatch] = this[kDispatch]
return this[kDispatch](opts, handler)
}

let dispatch = this[kDispatch].bind(this)
for (let i = this[kInterceptors].length - 1; i >= 0; i--) {
dispatch = this[kInterceptors][i](dispatch)
}
this[kInterceptedDispatch] = dispatch
return dispatch(opts, handler)
}

dispatch (opts, handler) {
if (!handler || typeof handler !== 'object') {
throw new InvalidArgumentError('handler must be an object')
Expand All @@ -176,7 +144,7 @@ class DispatcherBase extends Dispatcher {
throw new ClientClosedError()
}

return this[kInterceptedDispatch](opts, handler)
return this[kDispatch](opts, handler)
} catch (err) {
if (typeof handler.onError !== 'function') {
throw new InvalidArgumentError('invalid onError method')
Expand Down

0 comments on commit 00118ad

Please sign in to comment.