Skip to content

Commit

Permalink
refactor: refine the code in retrieveWorker to make it more readable (
Browse files Browse the repository at this point in the history
  • Loading branch information
POABOB committed Sep 17, 2023
1 parent 1ce8146 commit aee9c2e
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 72 deletions.
62 changes: 26 additions & 36 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,51 +333,41 @@ func (p *Pool) addWaiting(delta int) {

// retrieveWorker returns an available worker to run the tasks.
func (p *Pool) retrieveWorker() (w worker) {
spawnWorker := func() {
p.lock.Lock()

retry:
// First try to fetch the worker from the queue.
if w = p.workers.detach(); w != nil {
p.lock.Unlock()
return
}

// If the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock()
w = p.workerCache.Get().(*goWorker)
w.run()
return
}

p.lock.Lock()
w = p.workers.detach()
if w != nil { // first try to fetch the worker from the queue
// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
// if the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
p.lock.Unlock()
spawnWorker()
} else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
if p.options.Nonblocking {
p.lock.Unlock()
return
}
retry:
if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}

p.addWaiting(1)
p.cond.Wait() // block and wait for an available worker
p.addWaiting(-1)
return
}

if p.IsClosed() {
p.lock.Unlock()
return
}
// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
p.addWaiting(1)
p.cond.Wait() // block and wait for an available worker
p.addWaiting(-1)

if w = p.workers.detach(); w == nil {
if p.Free() > 0 {
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}
if p.IsClosed() {
p.lock.Unlock()
return
}
return

goto retry
}

// revertWorker puts a worker back into free pool, recycling the goroutines.
Expand Down
62 changes: 26 additions & 36 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,51 +339,41 @@ func (p *PoolWithFunc) addWaiting(delta int) {

// retrieveWorker returns an available worker to run the tasks.
func (p *PoolWithFunc) retrieveWorker() (w worker) {
spawnWorker := func() {
p.lock.Lock()

retry:
// First try to fetch the worker from the queue.
if w = p.workers.detach(); w != nil {
p.lock.Unlock()
return
}

// If the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock()
w = p.workerCache.Get().(*goWorkerWithFunc)
w.run()
return
}

p.lock.Lock()
w = p.workers.detach()
if w != nil { // first try to fetch the worker from the queue
// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
// if the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
p.lock.Unlock()
spawnWorker()
} else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
if p.options.Nonblocking {
p.lock.Unlock()
return
}
retry:
if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}

p.addWaiting(1)
p.cond.Wait() // block and wait for an available worker
p.addWaiting(-1)
return
}

if p.IsClosed() {
p.lock.Unlock()
return
}
// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
p.addWaiting(1)
p.cond.Wait() // block and wait for an available worker
p.addWaiting(-1)

if w = p.workers.detach(); w == nil {
if p.Free() > 0 {
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}
if p.IsClosed() {
p.lock.Unlock()
return
}
return

goto retry
}

// revertWorker puts a worker back into free pool, recycling the goroutines.
Expand Down

0 comments on commit aee9c2e

Please sign in to comment.