diff --git a/ants_test.go b/ants_test.go index 41b1566..d1e6378 100644 --- a/ants_test.go +++ b/ants_test.go @@ -23,6 +23,7 @@ package ants import ( + "fmt" "log" "os" "runtime" @@ -563,6 +564,53 @@ func TestInfinitePool(t *testing.T) { } } +func TestWithDisablePurge(t *testing.T) { + numWorker := 2 + p, _ := NewPool(numWorker, WithDisablePurge(true)) + _ = p.Submit(func() { + fmt.Println("work1") + time.Sleep(time.Second) + }) + + _ = p.Submit(func() { + fmt.Println("work2") + time.Sleep(time.Second) + }) + + if n := p.Running(); n != 2 { + t.Errorf("expect 2 workers running, but got %d", n) + } + + if n := p.Free(); n != 0 { + t.Errorf("expect zero of free workers, but got %d", n) + } + + p.Tune(10) + if capacity := p.Cap(); capacity != 10 { + t.Fatalf("expect capacity: 10 but got %d", capacity) + } + + _ = p.Submit(func() { + fmt.Println("work3") + time.Sleep(time.Second) + }) + + if n := p.Running(); n != 3 { + t.Errorf("expect 3 workers running, but got %d", n) + } + + if n := p.Free(); n != 7 { + t.Errorf("expect 7 of free workers, but got %d", n) + } + + p.Release() + p.Reboot() + if n := p.Running(); n != 3 { + t.Errorf("expect 3 workers running, but got %d", n) + } + +} + func TestInfinitePoolWithFunc(t *testing.T) { c := make(chan struct{}) p, _ := NewPoolWithFunc(-1, func(i interface{}) { diff --git a/options.go b/options.go index caa830b..7c16ad2 100644 --- a/options.go +++ b/options.go @@ -39,6 +39,10 @@ type Options struct { // Logger is the customized logger for logging info, if it is not set, // default standard logger from log package is used. Logger Logger + + // When DisablePurge is false, a separate goroutine is started and periodically purges expired workers. + // default is false. + DisablePurge bool } // WithOptions accepts the whole options config. @@ -89,3 +93,10 @@ func WithLogger(logger Logger) Option { opts.Logger = logger } } + +// WithDisablePurge indicates whether we turn off automatically purge. +func WithDisablePurge(disable bool) Option { + return func(opts *Options) { + opts.DisablePurge = disable + } +} diff --git a/pool.go b/pool.go index ddd1fc3..25a0c1f 100644 --- a/pool.go +++ b/pool.go @@ -150,8 +150,9 @@ func NewPool(size int, options ...Option) (*Pool, error) { // Start a goroutine to clean up expired workers periodically. var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) - go p.purgePeriodically(ctx) - + if !p.options.DisablePurge { + go p.purgePeriodically(ctx) + } return p, nil } @@ -259,7 +260,9 @@ func (p *Pool) Reboot() { atomic.StoreInt32(&p.heartbeatDone, 0) var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) - go p.purgePeriodically(ctx) + if !p.options.DisablePurge { + go p.purgePeriodically(ctx) + } } }