Skip to content

Commit

Permalink
feat: implement pool.Waiting() API
Browse files Browse the repository at this point in the history
Fixes #157
  • Loading branch information
panjf2000 committed May 7, 2022
1 parent 607d039 commit 9310acd
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 43 deletions.
47 changes: 27 additions & 20 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type Pool struct {
// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
workerCache sync.Pool

// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
blockingNum int
// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
waiting int32

heartbeatDone int32
stopHeartbeat context.CancelFunc
Expand Down Expand Up @@ -97,10 +97,11 @@ func (p *Pool) purgePeriodically(ctx context.Context) {
expiredWorkers[i] = nil
}

// There might be a situation that all workers have been cleaned up(no any worker is running)
// There might be a situation where all workers have been cleaned up(no worker is running),
// or another case where the pool capacity has been Tuned up,
// while some invokers still get stuck in "p.cond.Wait()",
// then it ought to wake all those invokers.
if p.Running() == 0 {
if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
p.cond.Broadcast()
}
}
Expand Down Expand Up @@ -174,12 +175,12 @@ func (p *Pool) Submit(task func()) error {
return nil
}

// Running returns the amount of the currently running goroutines.
// Running returns the number of workers currently running.
func (p *Pool) Running() int {
return int(atomic.LoadInt32(&p.running))
}

// Free returns the amount of available goroutines to work, -1 indicates this pool is unlimited.
// Free returns the number of available goroutines to work, -1 indicates this pool is unlimited.
func (p *Pool) Free() int {
c := p.Cap()
if c < 0 {
Expand All @@ -188,6 +189,11 @@ func (p *Pool) Free() int {
return c - p.Running()
}

// Waiting returns the number of tasks which are waiting be executed.
func (p *Pool) Waiting() int {
return int(atomic.LoadInt32(&p.waiting))
}

// Cap returns the capacity of this pool.
func (p *Pool) Cap() int {
return int(atomic.LoadInt32(&p.capacity))
Expand Down Expand Up @@ -259,14 +265,12 @@ func (p *Pool) Reboot() {

// ---------------------------------------------------------------------------

// incRunning increases the number of the currently running goroutines.
func (p *Pool) incRunning() {
atomic.AddInt32(&p.running, 1)
func (p *Pool) addRunning(delta int) {
atomic.AddInt32(&p.running, int32(delta))
}

// decRunning decreases the number of the currently running goroutines.
func (p *Pool) decRunning() {
atomic.AddInt32(&p.running, -1)
func (p *Pool) addWaiting(delta int) {
atomic.AddInt32(&p.waiting, int32(delta))
}

// retrieveWorker returns an available worker to run the tasks.
Expand All @@ -292,30 +296,33 @@ func (p *Pool) retrieveWorker() (w *goWorker) {
return
}
retry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}
p.blockingNum++
p.addWaiting(1)
p.cond.Wait() // block and wait for an available worker
p.blockingNum--
p.addWaiting(-1)

if p.IsClosed() {
p.lock.Unlock()
return
}

var nw int
if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock()
if !p.IsClosed() {
spawnWorker()
}
spawnWorker()
return
}
if w = p.workers.detach(); w == nil {
if nw < capacity {
if nw < p.Cap() {
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}

p.lock.Unlock()
}
return
Expand Down
46 changes: 27 additions & 19 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type PoolWithFunc struct {
// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
workerCache sync.Pool

// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
blockingNum int
// waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock
waiting int32

heartbeatDone int32
stopHeartbeat context.CancelFunc
Expand Down Expand Up @@ -112,10 +112,11 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) {
expiredWorkers[i] = nil
}

// There might be a situation that all workers have been cleaned up(no worker is running)
// There might be a situation where all workers have been cleaned up(no worker is running),
// or another case where the pool capacity has been Tuned up,
// while some invokers still get stuck in "p.cond.Wait()",
// then it ought to wake all those invokers.
if p.Running() == 0 {
if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
p.cond.Broadcast()
}
}
Expand Down Expand Up @@ -191,12 +192,12 @@ func (p *PoolWithFunc) Invoke(args interface{}) error {
return nil
}

// Running returns the amount of the currently running goroutines.
// Running returns the number of workers currently running.
func (p *PoolWithFunc) Running() int {
return int(atomic.LoadInt32(&p.running))
}

// Free returns the amount of available goroutines to work, -1 indicates this pool is unlimited.
// Free returns the number of available goroutines to work, -1 indicates this pool is unlimited.
func (p *PoolWithFunc) Free() int {
c := p.Cap()
if c < 0 {
Expand All @@ -205,6 +206,11 @@ func (p *PoolWithFunc) Free() int {
return c - p.Running()
}

// Waiting returns the number of tasks which are waiting be executed.
func (p *PoolWithFunc) Waiting() int {
return int(atomic.LoadInt32(&p.waiting))
}

// Cap returns the capacity of this pool.
func (p *PoolWithFunc) Cap() int {
return int(atomic.LoadInt32(&p.capacity))
Expand Down Expand Up @@ -280,14 +286,12 @@ func (p *PoolWithFunc) Reboot() {

//---------------------------------------------------------------------------

// incRunning increases the number of the currently running goroutines.
func (p *PoolWithFunc) incRunning() {
atomic.AddInt32(&p.running, 1)
func (p *PoolWithFunc) addRunning(delta int) {
atomic.AddInt32(&p.running, int32(delta))
}

// decRunning decreases the number of the currently running goroutines.
func (p *PoolWithFunc) decRunning() {
atomic.AddInt32(&p.running, -1)
func (p *PoolWithFunc) addWaiting(delta int) {
atomic.AddInt32(&p.waiting, int32(delta))
}

// retrieveWorker returns an available worker to run the tasks.
Expand Down Expand Up @@ -316,24 +320,28 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) {
return
}
retry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}
p.blockingNum++
p.addWaiting(1)
p.cond.Wait() // block and wait for an available worker
p.blockingNum--
p.addWaiting(-1)

if p.IsClosed() {
p.lock.Unlock()
return
}

var nw int
if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock()
if !p.IsClosed() {
spawnWorker()
}
spawnWorker()
return
}
l := len(p.workers) - 1
if l < 0 {
if nw < capacity {
if nw < p.Cap() {
p.lock.Unlock()
spawnWorker()
return
Expand Down
4 changes: 2 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ type goWorker struct {
// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *goWorker) run() {
w.pool.incRunning()
w.pool.addRunning(1)
go func() {
defer func() {
w.pool.decRunning()
w.pool.addRunning(-1)
w.pool.workerCache.Put(w)
if p := recover(); p != nil {
if ph := w.pool.options.PanicHandler; ph != nil {
Expand Down
4 changes: 2 additions & 2 deletions worker_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ type goWorkerWithFunc struct {
// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *goWorkerWithFunc) run() {
w.pool.incRunning()
w.pool.addRunning(1)
go func() {
defer func() {
w.pool.decRunning()
w.pool.addRunning(-1)
w.pool.workerCache.Put(w)
if p := recover(); p != nil {
if ph := w.pool.options.PanicHandler; ph != nil {
Expand Down
1 change: 1 addition & 0 deletions worker_loop_queue_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !windows
// +build !windows

package ants
Expand Down
1 change: 1 addition & 0 deletions worker_stack_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !windows
// +build !windows

package ants
Expand Down

0 comments on commit 9310acd

Please sign in to comment.