Skip to content

Commit

Permalink
opt: fall back to LeastTasks when RoundRobin can't find a worker (#306)
Browse files Browse the repository at this point in the history
Besides, update a few comments and add new benchmarks for multi-pool
  • Loading branch information
panjf2000 committed Nov 21, 2023
1 parent 19bd1ea commit fb82167
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 20 deletions.
30 changes: 30 additions & 0 deletions ants_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ func BenchmarkAntsPool(b *testing.B) {
}
}

func BenchmarkAntsMultiPool(b *testing.B) {
var wg sync.WaitGroup
p, _ := NewMultiPool(10, PoolCap/10, RoundRobin, WithExpiryDuration(DefaultExpiredTime))
defer p.ReleaseTimeout(DefaultExpiredTime) //nolint:errcheck

b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(RunTimes)
for j := 0; j < RunTimes; j++ {
_ = p.Submit(func() {
demoFunc()
wg.Done()
})
}
wg.Wait()
}
}

func BenchmarkGoroutinesThroughput(b *testing.B) {
for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ {
Expand Down Expand Up @@ -170,3 +188,15 @@ func BenchmarkAntsPoolThroughput(b *testing.B) {
}
}
}

func BenchmarkAntsMultiPoolThroughput(b *testing.B) {
p, _ := NewMultiPool(10, PoolCap/10, RoundRobin, WithExpiryDuration(DefaultExpiredTime))
defer p.ReleaseTimeout(DefaultExpiredTime) //nolint:errcheck

b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ {
_ = p.Submit(demoFunc)
}
}
}
16 changes: 11 additions & 5 deletions multipool.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
// MultiPool consists of multiple pools, from which you will benefit the
// performance improvement on basis of the fine-grained locking that reduces
// the lock contention.
// MultiPool is a good fit for the scenario that you have a large number of
// MultiPool is a good fit for the scenario where you have a large number of
// tasks to submit, and you don't want the single pool to be the bottleneck.
type MultiPool struct {
pools []*Pool
Expand All @@ -70,8 +70,8 @@ func NewMultiPool(size, sizePerPool int, lbs LoadBalancingStrategy, options ...O
return &MultiPool{pools: pools, lbs: lbs}, nil
}

func (mp *MultiPool) next() (idx int) {
switch mp.lbs {
func (mp *MultiPool) next(lbs LoadBalancingStrategy) (idx int) {
switch lbs {
case RoundRobin:
if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 {
idx = 0
Expand All @@ -91,11 +91,17 @@ func (mp *MultiPool) next() (idx int) {
}

// Submit submits a task to a pool selected by the load-balancing strategy.
func (mp *MultiPool) Submit(task func()) error {
func (mp *MultiPool) Submit(task func()) (err error) {
if mp.IsClosed() {
return ErrPoolClosed
}

Check warning on line 97 in multipool.go

View check run for this annotation

Codecov / codecov/patch

multipool.go#L96-L97

Added lines #L96 - L97 were not covered by tests
return mp.pools[mp.next()].Submit(task)
if err = mp.pools[mp.next(mp.lbs)].Submit(task); err == nil {
return
}
if err == ErrPoolOverload && mp.lbs == RoundRobin {
return mp.pools[mp.next(LeastTasks)].Submit(task)
}
return

Check warning on line 104 in multipool.go

View check run for this annotation

Codecov / codecov/patch

multipool.go#L101-L104

Added lines #L101 - L104 were not covered by tests
}

// Running returns the number of the currently running workers across all pools.
Expand Down
17 changes: 12 additions & 5 deletions multipool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
// MultiPoolWithFunc consists of multiple pools, from which you will benefit the
// performance improvement on basis of the fine-grained locking that reduces
// the lock contention.
// MultiPoolWithFunc is a good fit for the scenario that you have a large number of
// MultiPoolWithFunc is a good fit for the scenario where you have a large number of
// tasks to submit, and you don't want the single pool to be the bottleneck.
type MultiPoolWithFunc struct {
pools []*PoolWithFunc
Expand All @@ -59,8 +59,8 @@ func NewMultiPoolWithFunc(size, sizePerPool int, fn func(interface{}), lbs LoadB
return &MultiPoolWithFunc{pools: pools, lbs: lbs}, nil
}

func (mp *MultiPoolWithFunc) next() (idx int) {
switch mp.lbs {
func (mp *MultiPoolWithFunc) next(lbs LoadBalancingStrategy) (idx int) {
switch lbs {
case RoundRobin:
if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 {
idx = 0
Expand All @@ -80,11 +80,18 @@ func (mp *MultiPoolWithFunc) next() (idx int) {
}

// Invoke submits a task to a pool selected by the load-balancing strategy.
func (mp *MultiPoolWithFunc) Invoke(args interface{}) error {
func (mp *MultiPoolWithFunc) Invoke(args interface{}) (err error) {
if mp.IsClosed() {
return ErrPoolClosed
}

Check warning on line 86 in multipool_func.go

View check run for this annotation

Codecov / codecov/patch

multipool_func.go#L85-L86

Added lines #L85 - L86 were not covered by tests
return mp.pools[mp.next()].Invoke(args)

if err = mp.pools[mp.next(mp.lbs)].Invoke(args); err == nil {
return
}
if err == ErrPoolOverload && mp.lbs == RoundRobin {
return mp.pools[mp.next(LeastTasks)].Invoke(args)
}
return

Check warning on line 94 in multipool_func.go

View check run for this annotation

Codecov / codecov/patch

multipool_func.go#L91-L94

Added lines #L91 - L94 were not covered by tests
}

// Running returns the number of the currently running workers across all pools.
Expand Down
11 changes: 6 additions & 5 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (
syncx "github.com/panjf2000/ants/v2/internal/sync"
)

// Pool accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
// Pool accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines.
type Pool struct {
// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
Expand Down Expand Up @@ -210,7 +211,7 @@ func NewPool(size int, options ...Option) (*Pool, error) {
// Submit submits a task to this pool.
//
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
// but what calls for special attention is that you will get blocked with the latest
// but what calls for special attention is that you will get blocked with the last
// Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a Pool with ants.WithNonblocking(true).
func (p *Pool) Submit(task func()) error {
Expand All @@ -230,7 +231,7 @@ func (p *Pool) Running() int {
return int(atomic.LoadInt32(&p.running))
}

// Free returns the number of available goroutines to work, -1 indicates this pool is unlimited.
// Free returns the number of available workers, -1 indicates this pool is unlimited.
func (p *Pool) Free() int {
c := p.Cap()
if c < 0 {
Expand All @@ -239,7 +240,7 @@ func (p *Pool) Free() int {
return c - p.Running()
}

// Waiting returns the number of tasks which are waiting be executed.
// Waiting returns the number of tasks waiting to be executed.
func (p *Pool) Waiting() int {
return int(atomic.LoadInt32(&p.waiting))
}
Expand Down Expand Up @@ -339,7 +340,7 @@ retry:
return
}

// If the worker queue is empty and we don't run out of the pool capacity,
// If the worker queue is empty, and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock()
Expand Down
10 changes: 5 additions & 5 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
syncx "github.com/panjf2000/ants/v2/internal/sync"
)

// PoolWithFunc accepts the tasks from client,
// PoolWithFunc accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct {
// capacity of the pool.
Expand Down Expand Up @@ -216,7 +216,7 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
// Invoke submits a task to pool.
//
// Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(),
// but what calls for special attention is that you will get blocked with the latest
// but what calls for special attention is that you will get blocked with the last
// Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a PoolWithFunc with ants.WithNonblocking(true).
func (p *PoolWithFunc) Invoke(args interface{}) error {
Expand All @@ -236,7 +236,7 @@ func (p *PoolWithFunc) Running() int {
return int(atomic.LoadInt32(&p.running))
}

// Free returns the number of available goroutines to work, -1 indicates this pool is unlimited.
// Free returns the number of available workers, -1 indicates this pool is unlimited.
func (p *PoolWithFunc) Free() int {
c := p.Cap()
if c < 0 {
Expand All @@ -245,7 +245,7 @@ func (p *PoolWithFunc) Free() int {
return c - p.Running()
}

// Waiting returns the number of tasks which are waiting be executed.
// Waiting returns the number of tasks waiting to be executed.
func (p *PoolWithFunc) Waiting() int {
return int(atomic.LoadInt32(&p.waiting))
}
Expand Down Expand Up @@ -345,7 +345,7 @@ retry:
return
}

// If the worker queue is empty and we don't run out of the pool capacity,
// If the worker queue is empty, and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock()
Expand Down

0 comments on commit fb82167

Please sign in to comment.