Skip to content

Commit

Permalink
feat: use weighted round robin in balancedPool (nodejs#1069)
Browse files Browse the repository at this point in the history
* fixes

* more fixes

* add test

* remove console.log

* rename startingWeightPerServer to maxWeightPerServer

* add another test
  • Loading branch information
jodevsa authored and metcoder95 committed Dec 26, 2022
1 parent e2a5e49 commit 44b8177
Show file tree
Hide file tree
Showing 2 changed files with 390 additions and 11 deletions.
85 changes: 81 additions & 4 deletions lib/balanced-pool.js
Expand Up @@ -18,6 +18,17 @@ const { parseOrigin } = require('./core/util')
const kFactory = Symbol('factory')

const kOptions = Symbol('options')
const kGreatestCommonDivisor = Symbol('kGreatestCommonDivisor')
const kCurrentWeight = Symbol('kCurrentWeight')
const kIndex = Symbol('kIndex')
const kWeight = Symbol('kWeight')
const kMaxWeightPerServer = Symbol('kMaxWeightPerServer')
const kErrorPenalty = Symbol('kErrorPenalty')

function getGreatestCommonDivisor (a, b) {
if (b === 0) return a
return getGreatestCommonDivisor(b, a % b)
}

function defaultFactory (origin, opts) {
return new Pool(origin, opts)
Expand All @@ -28,6 +39,11 @@ class BalancedPool extends PoolBase {
super()

this[kOptions] = opts
this[kIndex] = -1
this[kCurrentWeight] = 0

this[kMaxWeightPerServer] = this[kOptions].maxWeightPerServer || 100
this[kErrorPenalty] = this[kOptions].errorPenalty || 15

if (!Array.isArray(upstreams)) {
upstreams = [upstreams]
Expand All @@ -42,6 +58,7 @@ class BalancedPool extends PoolBase {
for (const upstream of upstreams) {
this.addUpstream(upstream)
}
this._updateBalancedPoolStats()
}

addUpstream (upstream) {
Expand All @@ -54,12 +71,40 @@ class BalancedPool extends PoolBase {
))) {
return this
}
const pool = this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions]))

this[kAddClient](pool)
pool.on('connect', () => {
pool[kWeight] = Math.min(this[kMaxWeightPerServer], pool[kWeight] + this[kErrorPenalty])
})

pool.on('connectionError', () => {
pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
this._updateBalancedPoolStats()
})

pool.on('disconnect', (...args) => {
const err = args[2]
if (err && err.code === 'UND_ERR_SOCKET') {
// decrease the weight of the pool.
pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
this._updateBalancedPoolStats()
}
})

for (const client of this[kClients]) {
client[kWeight] = this[kMaxWeightPerServer]
}

this[kAddClient](this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions])))
this._updateBalancedPoolStats()

return this
}

_updateBalancedPoolStats () {
this[kGreatestCommonDivisor] = this[kClients].map(p => p[kWeight]).reduce(getGreatestCommonDivisor, 0)
}

removeUpstream (upstream) {
const upstreamOrigin = parseOrigin(upstream).origin

Expand Down Expand Up @@ -100,10 +145,42 @@ class BalancedPool extends PoolBase {
return
}

this[kClients].splice(this[kClients].indexOf(dispatcher), 1)
this[kClients].push(dispatcher)
const allClientsBusy = this[kClients].map(pool => pool[kNeedDrain]).reduce((a, b) => a && b, true)

if (allClientsBusy) {
return
}

let counter = 0

let maxWeightIndex = this[kClients].findIndex(pool => !pool[kNeedDrain])

while (counter++ < this[kClients].length) {
this[kIndex] = (this[kIndex] + 1) % this[kClients].length
const pool = this[kClients][this[kIndex]]

// find pool index with the largest weight
if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) {
maxWeightIndex = this[kIndex]
}

// decrease the current weight every `this[kClients].length`.
if (this[kIndex] === 0) {
// Set the current weight to the next lower weight.
this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor]

if (this[kCurrentWeight] <= 0) {
this[kCurrentWeight] = this[kMaxWeightPerServer]
}
}
if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) {
return pool
}
}

return dispatcher
this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight]
this[kIndex] = maxWeightIndex
return this[kClients][maxWeightIndex]
}
}

Expand Down

0 comments on commit 44b8177

Please sign in to comment.