From c4826ec0459d3a147a74c3faa9a0dd9c22ee23f7 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 27 Nov 2021 21:05:53 +0100 Subject: [PATCH] fix: balanced pool (#1114) * fix: balanced pool * fixup * fixup --- lib/balanced-pool.js | 120 ++++++++-------------- lib/pool-base.js | 223 ++++++++++++++++++++++++++++++++++++++++ lib/pool.js | 229 +++++------------------------------------- test/balanced-pool.js | 4 +- test/pool.js | 2 +- 5 files changed, 294 insertions(+), 284 deletions(-) create mode 100644 lib/pool-base.js diff --git a/lib/balanced-pool.js b/lib/balanced-pool.js index 80bec15e4f6..f638f9ca2c8 100644 --- a/lib/balanced-pool.js +++ b/lib/balanced-pool.js @@ -1,21 +1,31 @@ 'use strict' -const { BalancedPoolMissingUpstreamError } = require('./core/errors') -const Dispatcher = require('./dispatcher') +const { + BalancedPoolMissingUpstreamError, + ClientClosedError, + InvalidArgumentError, + ClientDestroyedError +} = require('./core/errors') +const { + PoolBase, + kClients, + kNeedDrain, + kOnDrain, + kOnConnect, + kOnDisconnect, + kDispatch, + kOnConnectionError + } = require('./pool-base') const Pool = require('./pool') -const kPools = Symbol('kPools') -const kPoolOpts = Symbol('kPoolOpts') -const kUpstream = Symbol('kUpstream') -const kNeedDrain = Symbol('kNeedDrain') +const kOptions = Symbol('options') +const kUpstream = Symbol('upstream') -class BalancedPool extends Dispatcher { +class BalancedPool extends PoolBase { constructor (upstreams = [], opts = {}) { super() - this[kPools] = [] - this[kPoolOpts] = opts - this[kNeedDrain] = false + this[kOptions] = opts if (!Array.isArray(upstreams)) { upstreams = [upstreams] @@ -27,94 +37,52 @@ class BalancedPool extends Dispatcher { } addUpstream (upstream) { - if (this[kPools].find((pool) => pool[kUpstream] === upstream)) { + if (this[kClients].find((pool) => pool[kUpstream] === upstream)) { return this } - const pool = new Pool(upstream, Object.assign({}, this[kPoolOpts])) + const pool = new Pool(upstream, Object.assign({}, this[kOptions])) + .on('drain', this[kOnDrain]) + .on('connect', this[kOnConnect]) + .on('disconnect', this[kOnDisconnect]) + .on('connectionError', this[kOnConnectionError]) pool[kUpstream] = upstream - pool.on('connect', (...args) => { - this.emit('connect', ...args) - }) - - pool.on('disconnect', (...args) => { - this.emit('disconnect', ...args) - }) - - pool.on('drain', (...args) => { - pool[kNeedDrain] = false - - if (this[kNeedDrain]) { - this[kNeedDrain] = false - this.emit('drain', ...args) - } - }) - - this[kPools].push(pool) + this[kClients].push(pool) return this } - dispatch (opts, handler) { - // We validate that pools is greater than 0, - // otherwise we would have to wait until an upstream - // is added, which might never happen. - if (this[kPools].length === 0) { - throw new BalancedPoolMissingUpstreamError() - } - - const pool = this[kPools].find(pool => !pool[kNeedDrain]) || this[kPools][0] - - if (!pool.dispatch(opts, handler)) { - pool[kNeedDrain] = true - this[kNeedDrain] = true - } - - this[kPools].splice(this[kPools].indexOf(pool), 1) - this[kPools].push(pool) - - return !this[kNeedDrain] - } - removeUpstream (upstream) { - const pool = this[kPools].find((pool) => pool[kUpstream] === upstream) - const idx = this[kPools].indexOf(pool) - this[kPools].splice(idx, 1) + const pool = this[kClients].find((pool) => pool[kUpstream] === upstream) + const idx = this[kClients].indexOf(pool) + this[kClients].splice(idx, 1) pool.close() return this } get upstreams () { - return this[kPools].map((p) => p[kUpstream]) - } - - get destroyed () { - return this[kPools].reduce((acc, pool) => acc && pool.destroyed, true) + return this[kClients].map((p) => p[kUpstream]) } - get closed () { - return this[kPools].reduce((acc, pool) => acc && pool.closed, true) - } - - close (cb) { - const p = Promise.all(this[kPools].map((p) => p.close())) - - if (!cb) { - return p + [kDispatch] () { + // We validate that pools is greater than 0, + // otherwise we would have to wait until an upstream + // is added, which might never happen. + if (this[kClients].length === 0) { + throw new BalancedPoolMissingUpstreamError() } - p.then(() => process.nextTick(cb), (err) => process.nextTick(cb, err)) - } - - destroy (err, cb) { - const p = Promise.all(this[kPools].map((p) => p.destroy(err))) + let dispatcher = this[kClients].find(dispatcher => !dispatcher[kNeedDrain]) - if (!cb) { - return p + if (!dispatcher) { + return } - p.then(() => process.nextTick(cb)) + this[kClients].splice(this[kClients].indexOf(dispatcher), 1) + this[kClients].push(dispatcher) + + return dispatcher } } diff --git a/lib/pool-base.js b/lib/pool-base.js new file mode 100644 index 00000000000..a2b5e6285f9 --- /dev/null +++ b/lib/pool-base.js @@ -0,0 +1,223 @@ +'use strict' + +const Dispatcher = require('./dispatcher') +const { + ClientDestroyedError, + ClientClosedError, + InvalidArgumentError +} = require('./core/errors') +const FixedQueue = require('./node/fixed-queue') +const { kSize, kRunning, kPending, kBusy } = require('./core/symbols') + +const kClients = Symbol('clients') +const kNeedDrain = Symbol('needDrain') +const kQueue = Symbol('queue') +const kDestroyed = Symbol('destroyed') +const kClosedPromise = Symbol('closed promise') +const kClosedResolve = Symbol('closed resolve') +const kOnDrain = Symbol('onDrain') +const kOnConnect = Symbol('onConnect') +const kOnDisconnect = Symbol('onDisconnect') +const kOnConnectionError = Symbol('onConnectionError') +const kQueued = Symbol('queued') +const kDispatch = Symbol('dispatch') + +class PoolBase extends Dispatcher { + constructor () { + super() + + this[kQueue] = new FixedQueue() + this[kClosedPromise] = null + this[kClosedResolve] = null + this[kDestroyed] = false + this[kClients] = [] + this[kNeedDrain] = false + this[kQueued] = 0 + + const pool = this + + this[kOnDrain] = function onDrain (origin, targets) { + const queue = pool[kQueue] + + let needDrain = false + + while (!needDrain) { + const item = queue.shift() + if (!item) { + break + } + pool[kQueued]-- + needDrain = !this.dispatch(item.opts, item.handler) + } + + this[kNeedDrain] = needDrain + + if (!this[kNeedDrain] && pool[kNeedDrain]) { + pool[kNeedDrain] = false + pool.emit('drain', origin, [pool, ...targets]) + } + + if (pool[kClosedResolve] && queue.isEmpty()) { + Promise + .all(pool[kClients].map(c => c.close())) + .then(pool[kClosedResolve]) + } + } + + this[kOnConnect] = (origin, targets) => { + pool.emit('connect', origin, [pool, ...targets]) + } + + this[kOnDisconnect] = (origin, targets, err) => { + pool.emit('disconnect', origin, [pool, ...targets], err) + } + + this[kOnConnectionError] = (origin, targets, err) => { + pool.emit('connectionError', origin, [pool, ...targets], err) + } + } + + get [kBusy] () { + return this[kNeedDrain] + } + + get [kPending] () { + let ret = this[kQueued] + for (const { [kPending]: pending } of this[kClients]) { + ret += pending + } + return ret + } + + get [kRunning] () { + let ret = 0 + for (const { [kRunning]: running } of this[kClients]) { + ret += running + } + return ret + } + + get [kSize] () { + let ret = this[kQueued] + for (const { [kSize]: size } of this[kClients]) { + ret += size + } + return ret + } + + get destroyed () { + return this[kDestroyed] + } + + get closed () { + return this[kClosedPromise] != null + } + + close (cb) { + try { + if (this[kDestroyed]) { + throw new ClientDestroyedError() + } + + if (!this[kClosedPromise]) { + if (this[kQueue].isEmpty()) { + this[kClosedPromise] = Promise.all(this[kClients].map(c => c.close())) + } else { + this[kClosedPromise] = new Promise((resolve) => { + this[kClosedResolve] = resolve + }) + } + this[kClosedPromise] = this[kClosedPromise].then(() => { + this[kDestroyed] = true + }) + } + + if (cb) { + this[kClosedPromise].then(() => cb(null, null)) + } else { + return this[kClosedPromise] + } + } catch (err) { + if (cb) { + cb(err) + } else { + return Promise.reject(err) + } + } + } + + destroy (err, cb) { + this[kDestroyed] = true + + if (typeof err === 'function') { + cb = err + err = null + } + + if (!err) { + err = new ClientDestroyedError() + } + + while (true) { + const item = this[kQueue].shift() + if (!item) { + break + } + item.handler.onError(err) + } + + const promise = Promise.all(this[kClients].map(c => c.destroy(err))) + if (cb) { + promise.then(() => cb(null, null)) + } else { + return promise + } + } + + dispatch (opts, handler) { + if (!handler || typeof handler !== 'object') { + throw new InvalidArgumentError('handler must be an object') + } + + try { + if (this[kDestroyed]) { + throw new ClientDestroyedError() + } + + if (this[kClosedPromise]) { + throw new ClientClosedError() + } + + const dispatcher = this[kDispatch]() + + if (!dispatcher) { + this[kNeedDrain] = true + this[kQueue].push({ opts, handler }) + this[kQueued]++ + } else if (!dispatcher.dispatch(opts, handler)) { + dispatcher[kNeedDrain] = true + this[kNeedDrain] = this[kClients].some(dispatcher => !dispatcher[kNeedDrain]) + } + } catch (err) { + if (typeof handler.onError !== 'function') { + throw new InvalidArgumentError('invalid onError method') + } + + handler.onError(err) + } + + return !this[kNeedDrain] + } + +} + +module.exports = { + PoolBase, + kClients, + kNeedDrain, + kOnDrain, + kOnConnect, + kOnDisconnect, + kDispatch, + kOnConnectionError +} diff --git a/lib/pool.js b/lib/pool.js index a0a0a0db753..144d66e2d2d 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -1,38 +1,32 @@ 'use strict' -const Dispatcher = require('./dispatcher') +const { + PoolBase, + kClients, + kNeedDrain, + kOnDrain, + kOnConnect, + kOnDisconnect, + kDispatch, + kOnConnectionError + } = require('./pool-base') const Client = require('./client') const { - ClientClosedError, InvalidArgumentError, - ClientDestroyedError } = require('./core/errors') -const FixedQueue = require('./node/fixed-queue') const util = require('./core/util') -const { kSize, kRunning, kUrl, kPending, kBusy } = require('./core/symbols') -const assert = require('assert') +const { kUrl } = require('./core/symbols') const buildConnector = require('./core/connect') -const kClients = Symbol('clients') -const kNeedDrain = Symbol('needDrain') -const kQueue = Symbol('queue') -const kDestroyed = Symbol('destroyed') -const kClosedPromise = Symbol('closed promise') -const kClosedResolve = Symbol('closed resolve') const kOptions = Symbol('options') -const kOnDrain = Symbol('onDrain') -const kOnConnect = Symbol('onConnect') -const kOnDisconnect = Symbol('onDisconnect') -const kOnConnectionError = Symbol('onConnectionError') const kConnections = Symbol('connections') const kFactory = Symbol('factory') -const kQueued = Symbol('queued') function defaultFactory (origin, opts) { return new Client(origin, opts) } -class Pool extends Dispatcher { +class Pool extends PoolBase { constructor (origin, { connections, factory = defaultFactory, @@ -70,201 +64,26 @@ class Pool extends Dispatcher { this[kConnections] = connections || null this[kUrl] = util.parseOrigin(origin) this[kOptions] = { ...util.deepClone(options), connect } - this[kQueue] = new FixedQueue() - this[kClosedPromise] = null - this[kClosedResolve] = null - this[kDestroyed] = false - this[kClients] = [] - this[kNeedDrain] = false - this[kQueued] = 0 this[kFactory] = factory - - const pool = this - - this[kOnDrain] = function onDrain (url, targets) { - assert(pool[kUrl].origin === url.origin) - - const queue = pool[kQueue] - - let needDrain = false - - while (!needDrain) { - const item = queue.shift() - if (!item) { - break - } - pool[kQueued]-- - needDrain = !this.dispatch(item.opts, item.handler) - } - - this[kNeedDrain] = needDrain - - if (!this[kNeedDrain] && pool[kNeedDrain]) { - pool[kNeedDrain] = false - pool.emit('drain', origin, [pool, ...targets]) - } - - if (pool[kClosedResolve] && queue.isEmpty()) { - Promise - .all(pool[kClients].map(c => c.close())) - .then(pool[kClosedResolve]) - } - } - - this[kOnConnect] = (origin, targets) => { - pool.emit('connect', origin, [pool, ...targets]) - } - - this[kOnDisconnect] = (origin, targets, err) => { - pool.emit('disconnect', origin, [pool, ...targets], err) - } - - this[kOnConnectionError] = (origin, targets, err) => { - pool.emit('connectionError', origin, [pool, ...targets], err) - } - } - - get [kBusy] () { - return this[kNeedDrain] - } - - get [kPending] () { - let ret = this[kQueued] - for (const { [kPending]: pending } of this[kClients]) { - ret += pending - } - return ret - } - - get [kRunning] () { - let ret = 0 - for (const { [kRunning]: running } of this[kClients]) { - ret += running - } - return ret - } - - get [kSize] () { - let ret = this[kQueued] - for (const { [kSize]: size } of this[kClients]) { - ret += size - } - return ret } - get destroyed () { - return this[kDestroyed] - } + [kDispatch] () { + let dispatcher = this[kClients].find(dispatcher => !dispatcher[kNeedDrain]) - get closed () { - return this[kClosedPromise] != null - } - - dispatch (opts, handler) { - if (!handler || typeof handler !== 'object') { - throw new InvalidArgumentError('handler must be an object') + if (dispatcher) { + return dispatcher } - try { - if (this[kDestroyed]) { - throw new ClientDestroyedError() - } - - if (this[kClosedPromise]) { - throw new ClientClosedError() - } - - let dispatcher = this[kClients].find(dispatcher => !dispatcher[kNeedDrain]) - - if (!dispatcher) { - if (!this[kConnections] || this[kClients].length < this[kConnections]) { - dispatcher = this[kFactory](this[kUrl], this[kOptions]) - .on('drain', this[kOnDrain]) - .on('connect', this[kOnConnect]) - .on('disconnect', this[kOnDisconnect]) - .on('connectionError', this[kOnConnectionError]) - this[kClients].push(dispatcher) - } - } - - if (!dispatcher) { - this[kNeedDrain] = true - this[kQueue].push({ opts, handler }) - this[kQueued]++ - } else if (!dispatcher.dispatch(opts, handler)) { - dispatcher[kNeedDrain] = true - this[kNeedDrain] = this[kConnections] && this[kClients].length === this[kConnections] - } - } catch (err) { - if (typeof handler.onError !== 'function') { - throw new InvalidArgumentError('invalid onError method') - } - - handler.onError(err) + if (!this[kConnections] || this[kClients].length < this[kConnections]) { + dispatcher = this[kFactory](this[kUrl], this[kOptions]) + .on('drain', this[kOnDrain]) + .on('connect', this[kOnConnect]) + .on('disconnect', this[kOnDisconnect]) + .on('connectionError', this[kOnConnectionError]) + this[kClients].push(dispatcher) } - return !this[kNeedDrain] - } - - close (cb) { - try { - if (this[kDestroyed]) { - throw new ClientDestroyedError() - } - - if (!this[kClosedPromise]) { - if (this[kQueue].isEmpty()) { - this[kClosedPromise] = Promise.all(this[kClients].map(c => c.close())) - } else { - this[kClosedPromise] = new Promise((resolve) => { - this[kClosedResolve] = resolve - }) - } - this[kClosedPromise] = this[kClosedPromise].then(() => { - this[kDestroyed] = true - }) - } - - if (cb) { - this[kClosedPromise].then(() => cb(null, null)) - } else { - return this[kClosedPromise] - } - } catch (err) { - if (cb) { - cb(err) - } else { - return Promise.reject(err) - } - } - } - - destroy (err, cb) { - this[kDestroyed] = true - - if (typeof err === 'function') { - cb = err - err = null - } - - if (!err) { - err = new ClientDestroyedError() - } - - while (true) { - const item = this[kQueue].shift() - if (!item) { - break - } - item.handler.onError(err) - } - - const promise = Promise.all(this[kClients].map(c => c.destroy(err))) - if (cb) { - promise.then(() => cb(null, null)) - } else { - return promise - } + return dispatcher } } diff --git a/test/balanced-pool.js b/test/balanced-pool.js index 28a63217531..473522ac9f2 100644 --- a/test/balanced-pool.js +++ b/test/balanced-pool.js @@ -102,10 +102,10 @@ test('connect/disconnect event(s)', (t) => { }) t.teardown(pool.close.bind(pool)) - pool.on('connect', (origin, [pool, client]) => { + pool.on('connect', (origin, [pool, pool2, client]) => { t.equal(client instanceof Client, true) }) - pool.on('disconnect', (origin, [pool, client], error) => { + pool.on('disconnect', (origin, [pool, pool2, client], error) => { t.ok(client instanceof Client) t.type(error, errors.InformationalError) t.equal(error.code, 'UND_ERR_INFO') diff --git a/test/pool.js b/test/pool.js index 0c6099549b9..6b7cd89ad21 100644 --- a/test/pool.js +++ b/test/pool.js @@ -380,7 +380,7 @@ test('busy', (t) => { }) }) t.equal(client[kPending], n) - t.equal(client[kBusy], n >= 2) + t.equal(client[kBusy], n > 2) t.equal(client[kSize], n) t.equal(client[kRunning], 0) }