Skip to content

Commit

Permalink
Vault-1983: Use fairsharing to distribute workers between queues (has…
Browse files Browse the repository at this point in the history
…hicorp#11789)

* prelim fairshare prototype, untested and prototype status

* add tests for new fairshare infra - this likely fails tests for being racy

* probably fix races for code and test

* one more lock to fix for races

* fairsharing queue work distribution, tests, fixes, etc

* comment, shorten wait time

* typos and comments

* fix inverted worker count logic

* Update helper/fairshare/jobmanager.go

typo

* Update helper/fairshare/jobmanager.go

clarify comment

* move back to round robin between queues

* improvements from self review

* add job manager stress test
  • Loading branch information
swayne275 authored and jartek committed Sep 11, 2021
1 parent 5a69912 commit c29b7ca
Show file tree
Hide file tree
Showing 4 changed files with 428 additions and 73 deletions.
183 changes: 137 additions & 46 deletions helper/fairshare/jobmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"container/list"
"fmt"
"io/ioutil"
"math"
"sync"
"time"

"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
Expand All @@ -13,37 +15,33 @@ import (
"github.com/hashicorp/vault/sdk/helper/logging"
)

/*
Future Work:
- track workers per queue. this will involve things like:
- somehow wrap the Execute/OnFailure functions to increment counter when
they start running, and decrement when they stop running
-- put a queue.IncrementCounter() call at the beginning
-- call the provided work function in the middle
-- put a queue.DecrementCounter() call at the end
- job has a queueID or reference to the queue
- queue only removed when empty AND no workers
*/

type JobManager struct {
name string
queues map[string]*list.List
queuesIndex []string
lastQueueAccessed int
quit chan struct{}
newWork chan struct{} // must be buffered
workerPool *dispatcher
onceStart sync.Once
onceStop sync.Once
logger log.Logger
totalJobs int
metricSink *metricsutil.ClusterMetricSink
name string
queues map[string]*list.List

quit chan struct{}
newWork chan struct{} // must be buffered

workerPool *dispatcher
workerCount map[string]int

onceStart sync.Once
onceStop sync.Once

logger log.Logger

totalJobs int
metricSink *metricsutil.ClusterMetricSink

// waitgroup for testing stop functionality
wg sync.WaitGroup

// protects `queues`, `queuesIndex`, `lastQueueAccessed`
// protects `queues`, `workerCount`, `queuesIndex`, `lastQueueAccessed`
l sync.RWMutex

// track queues by index for round robin worker assignment
queuesIndex []string
lastQueueAccessed int
}

// NewJobManager creates a job manager, with an optional name
Expand All @@ -66,13 +64,14 @@ func NewJobManager(name string, numWorkers int, l log.Logger, metricSink *metric
j := JobManager{
name: name,
queues: make(map[string]*list.List),
queuesIndex: make([]string, 0),
lastQueueAccessed: -1,
quit: make(chan struct{}),
newWork: make(chan struct{}, 1),
workerPool: wp,
workerCount: make(map[string]int),
logger: l,
metricSink: metricSink,
queuesIndex: make([]string, 0),
lastQueueAccessed: -1,
}

j.logger.Trace("created job manager", "name", name, "pool_size", numWorkers)
Expand Down Expand Up @@ -138,11 +137,12 @@ func (j *JobManager) GetPendingJobCount() int {

// GetWorkerCounts() returns a map of queue ID to number of active workers
func (j *JobManager) GetWorkerCounts() map[string]int {
// TODO implement with VLT-145
return nil
j.l.RLock()
defer j.l.RUnlock()
return j.workerCount
}

// GetWorkQueueLengths() returns a map of queue ID to number of active workers
// GetWorkQueueLengths() returns a map of queue ID to number of jobs in the queue
func (j *JobManager) GetWorkQueueLengths() map[string]int {
out := make(map[string]int)

Expand All @@ -156,20 +156,23 @@ func (j *JobManager) GetWorkQueueLengths() map[string]int {
return out
}

// getNextJob grabs the next job to be processed and prunes empty queues
func (j *JobManager) getNextJob() Job {
// getNextJob pops the next job to be processed and prunes empty queues
// it also returns the ID of the queue the job is associated with
func (j *JobManager) getNextJob() (Job, string) {
j.l.Lock()
defer j.l.Unlock()

if len(j.queues) == 0 {
return nil
return nil, ""
}

j.lastQueueAccessed = (j.lastQueueAccessed + 1) % len(j.queuesIndex)
queueID := j.queuesIndex[j.lastQueueAccessed]
queueID, canAssignWorker := j.getNextQueue()
if !canAssignWorker {
return nil, ""
}

jobElement := j.queues[queueID].Front()
out := j.queues[queueID].Remove(jobElement)
jobRaw := j.queues[queueID].Remove(jobElement)

j.totalJobs--

Expand All @@ -179,10 +182,81 @@ func (j *JobManager) getNextJob() Job {
}

if j.queues[queueID].Len() == 0 {
// we remove the empty queue, but we don't remove the worker count
// in case we are still working on previous jobs from this queue.
// worker count cleanup is handled in j.decrementWorkerCount
j.removeLastQueueAccessed()
}

return out.(Job)
return jobRaw.(Job), queueID
}

// returns the next queue to assign work from, and a bool if there is a queue
// that can have a worker assigned. if there is work to be assigned,
// j.lastQueueAccessed will be updated to that queue.
// note: this must be called with j.l held
func (j *JobManager) getNextQueue() (string, bool) {
var nextQueue string
var canAssignWorker bool

// ensure we loop through all existing queues until we find an eligible
// queue, if one exists.
queueIdx := j.nextQueueIndex(j.lastQueueAccessed)
for i := 0; i < len(j.queuesIndex); i++ {
potentialQueueID := j.queuesIndex[queueIdx]

if !j.queueWorkersSaturated(potentialQueueID) {
nextQueue = potentialQueueID
canAssignWorker = true
j.lastQueueAccessed = queueIdx
break
}

queueIdx = j.nextQueueIndex(queueIdx)
}

return nextQueue, canAssignWorker
}

// get the index of the next queue in round-robin order
// note: this must be called with j.l held
func (j *JobManager) nextQueueIndex(currentIdx int) int {
return (currentIdx + 1) % len(j.queuesIndex)
}

// returns true if there are already too many workers on this queue
// note: this must be called with j.l held (at least for read).
// note: we may want to eventually factor in queue length relative to num queues
func (j *JobManager) queueWorkersSaturated(queueID string) bool {
numActiveQueues := float64(len(j.queues))
numTotalWorkers := float64(j.workerPool.numWorkers)
maxWorkersPerQueue := math.Ceil(0.9 * numTotalWorkers / numActiveQueues)

numWorkersPerQueue := j.workerCount

return numWorkersPerQueue[queueID] >= int(maxWorkersPerQueue)
}

// increment the worker count for this queue
func (j *JobManager) incrementWorkerCount(queueID string) {
j.l.Lock()
defer j.l.Unlock()

j.workerCount[queueID]++
}

// decrement the worker count for this queue
// this also removes worker tracking for this queue if needed
func (j *JobManager) decrementWorkerCount(queueID string) {
j.l.Lock()
defer j.l.Unlock()

j.workerCount[queueID]--

_, queueExists := j.queues[queueID]
if !queueExists && j.workerCount[queueID] < 1 {
delete(j.workerCount, queueID)
}
}

// assignWork continually loops checks for new jobs and dispatches them to the
Expand All @@ -203,39 +277,56 @@ func (j *JobManager) assignWork() {
default:
}

job := j.getNextJob()
job, queueID := j.getNextJob()
if job != nil {
j.workerPool.dispatch(job)
j.workerPool.dispatch(job,
func() {
j.incrementWorkerCount(queueID)
},
func() {
j.decrementWorkerCount(queueID)
})
} else {
break
}
}

// listen for a wake-up when an emtpy job manager has been given
// new work
select {
case <-j.quit:
j.wg.Done()
return
case <-j.newWork:
break
// listen for wake-up when an empty job manager has been given work
case <-time.After(50 * time.Millisecond):
// periodically check if new workers can be assigned. with the
// fairsharing worker distribution it can be the case that there
// is work waiting, but no queues are eligible for another worker
}
}
}()
}

// addQueue generates a new queue if a queue for `queueID` doesn't exist
// note: this must be called with l held for write
// it also starts tracking workers on that queue, if not already tracked
// note: this must be called with j.l held for write
func (j *JobManager) addQueue(queueID string) {
if _, ok := j.queues[queueID]; !ok {
j.queues[queueID] = list.New()
j.queuesIndex = append(j.queuesIndex, queueID)
}

// it's possible the queue ran out of work and was pruned, but there were
// still workers operating on data formerly in that queue, which were still
// being tracked. if that is the case, we don't want to wipe out that worker
// count when the queue is re-initialized.
if _, ok := j.workerCount[queueID]; !ok {
j.workerCount[queueID] = 0
}
}

// removeLastQueueAccessed removes the queue and index map for the last queue
// accessed. It is to be used when the last queue accessed has emptied.
// note: this must be called with l held for write
// removes the queue and index tracker for the last queue accessed.
// it is to be used when the last queue accessed has emptied.
// note: this must be called with j.l held.
func (j *JobManager) removeLastQueueAccessed() {
if j.lastQueueAccessed == -1 || j.lastQueueAccessed > len(j.queuesIndex)-1 {
j.logger.Warn("call to remove queue out of bounds", "idx", j.lastQueueAccessed)
Expand Down

0 comments on commit c29b7ca

Please sign in to comment.