Skip to content

Commit

Permalink
Add option to turn off automatically purge
Browse files Browse the repository at this point in the history
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
  • Loading branch information
czs007 committed Sep 27, 2022
1 parent 06e6934 commit e6d80bd
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 3 deletions.
48 changes: 48 additions & 0 deletions ants_test.go
Expand Up @@ -23,6 +23,7 @@
package ants

import (
"fmt"
"log"
"os"
"runtime"
Expand Down Expand Up @@ -563,6 +564,53 @@ func TestInfinitePool(t *testing.T) {
}
}

func TestWithDisablePurge(t *testing.T) {
numWorker := 2
p, _ := NewPool(numWorker, WithDisablePurge(true))
_ = p.Submit(func() {
fmt.Println("work1")
time.Sleep(time.Second)
})

_ = p.Submit(func() {
fmt.Println("work2")
time.Sleep(time.Second)
})

if n := p.Running(); n != 2 {
t.Errorf("expect 2 workers running, but got %d", n)
}

if n := p.Free(); n != 0 {
t.Errorf("expect zero of free workers, but got %d", n)
}

p.Tune(10)
if capacity := p.Cap(); capacity != 10 {
t.Fatalf("expect capacity: 10 but got %d", capacity)
}

_ = p.Submit(func() {
fmt.Println("work3")
time.Sleep(time.Second)
})

if n := p.Running(); n != 3 {
t.Errorf("expect 3 workers running, but got %d", n)
}

if n := p.Free(); n != 7 {
t.Errorf("expect 7 of free workers, but got %d", n)
}

p.Release()
p.Reboot()
if n := p.Running(); n != 3 {
t.Errorf("expect 3 workers running, but got %d", n)
}

}

func TestInfinitePoolWithFunc(t *testing.T) {
c := make(chan struct{})
p, _ := NewPoolWithFunc(-1, func(i interface{}) {
Expand Down
11 changes: 11 additions & 0 deletions options.go
Expand Up @@ -39,6 +39,10 @@ type Options struct {
// Logger is the customized logger for logging info, if it is not set,
// default standard logger from log package is used.
Logger Logger

// When DisablePurge is false, a separate goroutine is started and periodically purges expired workers.
// default is false.
DisablePurge bool
}

// WithOptions accepts the whole options config.
Expand Down Expand Up @@ -89,3 +93,10 @@ func WithLogger(logger Logger) Option {
opts.Logger = logger
}
}

// WithDisablePurge indicates whether we turn off automatically purge.
func WithDisablePurge(disable bool) Option {
return func(opts *Options) {
opts.DisablePurge = disable
}
}
9 changes: 6 additions & 3 deletions pool.go
Expand Up @@ -150,8 +150,9 @@ func NewPool(size int, options ...Option) (*Pool, error) {
// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx)

if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
}
return p, nil
}

Expand Down Expand Up @@ -259,7 +260,9 @@ func (p *Pool) Reboot() {
atomic.StoreInt32(&p.heartbeatDone, 0)
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx)
if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
}
}
}

Expand Down

0 comments on commit e6d80bd

Please sign in to comment.