Skip to content

Commit

Permalink
opt: awake blocking goroutines more precisely in purgeStaleWorkers
Browse files Browse the repository at this point in the history
Fixes #272
  • Loading branch information
panjf2000 committed Apr 15, 2023
1 parent 73defa0 commit b32591f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 22 deletions.
17 changes: 6 additions & 11 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) {
break
}

var isDormant bool
p.lock.Lock()
staleWorkers := p.workers.refresh(p.options.ExpiryDuration)
n := p.Running()
isDormant = n == 0 || n == len(staleWorkers)
p.lock.Unlock()

// Notify obsolete workers to stop.
Expand All @@ -104,10 +107,8 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) {
}

// There might be a situation where all workers have been cleaned up(no worker is running),
// or another case where the pool capacity has been Tuned up,
// while some invokers still get stuck in "p.cond.Wait()",
// then it ought to wake all those invokers.
if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
// while some invokers still are stuck in "p.cond.Wait()", then we need to awake those invokers.
if isDormant && p.Waiting() > 0 {
p.cond.Broadcast()
}
}
Expand Down Expand Up @@ -365,14 +366,8 @@ func (p *Pool) retrieveWorker() (w worker) {
return
}

var nw int
if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock()
spawnWorker()
return
}
if w = p.workers.detach(); w == nil {
if nw < p.Cap() {
if p.Free() > 0 {
p.lock.Unlock()
spawnWorker()
return
Expand Down
17 changes: 6 additions & 11 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
break
}

var isDormant bool
p.lock.Lock()
staleWorkers := p.workers.refresh(p.options.ExpiryDuration)
n := p.Running()
isDormant = n == 0 || n == len(staleWorkers)
p.lock.Unlock()

// Notify obsolete workers to stop.
Expand All @@ -105,10 +108,8 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
}

// There might be a situation where all workers have been cleaned up(no worker is running),
// or another case where the pool capacity has been Tuned up,
// while some invokers still get stuck in "p.cond.Wait()",
// then it ought to wake all those invokers.
if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
// while some invokers still are stuck in "p.cond.Wait()", then we need to awake those invokers.
if isDormant && p.Waiting() > 0 {
p.cond.Broadcast()
}
}
Expand Down Expand Up @@ -371,14 +372,8 @@ func (p *PoolWithFunc) retrieveWorker() (w worker) {
return
}

var nw int
if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock()
spawnWorker()
return
}
if w = p.workers.detach(); w == nil {
if nw < p.Cap() {
if p.Free() > 0 {
p.lock.Unlock()
spawnWorker()
return
Expand Down

0 comments on commit b32591f

Please sign in to comment.