Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to turn off automatically purge #253

Merged
merged 1 commit into from Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
125 changes: 125 additions & 0 deletions ants_test.go
Expand Up @@ -563,6 +563,131 @@ func TestInfinitePool(t *testing.T) {
}
}

func testPoolWithDisablePurge(t *testing.T, p *Pool, numWorker int) {
sig := make(chan struct{})
wg := sync.WaitGroup{}

wg.Add(numWorker)
for i := 0; i < numWorker; i++ {
_ = p.Submit(func() {
wg.Done()
sig <- struct{}{}
})
}
wg.Wait()
runCnt := p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)

freeCnt := p.Free()
assert.EqualValuesf(t, 0, freeCnt, "expect % free workers, but got %d", 0, freeCnt)

newCap := 10

p.Tune(newCap)
capacity := p.Cap()
assert.EqualValuesf(t, newCap, capacity, "expect capacity: %d but got %d", newCap, capacity)
<-sig
time.Sleep(time.Millisecond * 10)

wg.Add(1)
_ = p.Submit(func() {
wg.Done()
sig <- struct{}{}
})
wg.Wait()

runCnt = p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)

<-sig
<-sig

freeCnt = p.Free()
assert.EqualValuesf(t, newCap-numWorker, freeCnt, "expect % free workers, but got %d", newCap-numWorker, freeCnt)

p.Release()
p.Reboot()

runCnt = p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)
}

func TestWithDisablePurge(t *testing.T) {
numWorker := 2
p, _ := NewPool(numWorker, WithDisablePurge(true))
testPoolWithDisablePurge(t, p, numWorker)
}

func TestWithDisablePurgeAndWithExpiration(t *testing.T) {
numWorker := 2
p, _ := NewPool(numWorker, WithDisablePurge(true), WithExpiryDuration(time.Millisecond*100))
testPoolWithDisablePurge(t, p, numWorker)
}

func testPoolFuncWithDisablePurge(t *testing.T, p *PoolWithFunc, numWorker int, wg *sync.WaitGroup, sig chan struct{}) {
wg.Add(numWorker)
for i := 0; i < numWorker; i++ {
_ = p.Invoke(i)
}
wg.Wait()
runCnt := p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)

freeCnt := p.Free()
assert.EqualValuesf(t, 0, freeCnt, "expect % free workers, but got %d", 0, freeCnt)

newCap := 10
p.Tune(newCap)
capacity := p.Cap()
assert.EqualValuesf(t, newCap, capacity, "expect capacity: %d but got %d", newCap, capacity)
<-sig

time.Sleep(time.Millisecond * 200)

wg.Add(1)
_ = p.Invoke(10)
wg.Wait()

runCnt = p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)

<-sig
<-sig

freeCnt = p.Free()
assert.EqualValuesf(t, newCap-numWorker, freeCnt, "expect % free workers, but got %d", newCap-numWorker, freeCnt)

p.Release()
p.Reboot()

runCnt = p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)
}

func TestPoolFuncWithDisablePurge(t *testing.T) {
numWorker := 2
sig := make(chan struct{})
wg := sync.WaitGroup{}

p, _ := NewPoolWithFunc(numWorker, func(i interface{}) {
wg.Done()
sig <- struct{}{}
}, WithDisablePurge(true))
testPoolFuncWithDisablePurge(t, p, numWorker, &wg, sig)
}

func TestPoolFuncWithDisablePurgeAndWithExpiration(t *testing.T) {
numWorker := 2
sig := make(chan struct{})
wg := sync.WaitGroup{}

p, _ := NewPoolWithFunc(numWorker, func(i interface{}) {
wg.Done()
sig <- struct{}{}
}, WithDisablePurge(true), WithExpiryDuration(time.Millisecond*100))
testPoolFuncWithDisablePurge(t, p, numWorker, &wg, sig)
}

func TestInfinitePoolWithFunc(t *testing.T) {
c := make(chan struct{})
p, _ := NewPoolWithFunc(-1, func(i interface{}) {
Expand Down
10 changes: 10 additions & 0 deletions options.go
Expand Up @@ -39,6 +39,9 @@ type Options struct {
// Logger is the customized logger for logging info, if it is not set,
// default standard logger from log package is used.
Logger Logger

// When DisablePurge is true, workers are not purged and are resident.
DisablePurge bool
}

// WithOptions accepts the whole options config.
Expand Down Expand Up @@ -89,3 +92,10 @@ func WithLogger(logger Logger) Option {
opts.Logger = logger
}
}

// WithDisablePurge indicates whether we turn off automatically purge.
func WithDisablePurge(disable bool) Option {
return func(opts *Options) {
opts.DisablePurge = disable
}
}
21 changes: 13 additions & 8 deletions pool.go
Expand Up @@ -68,6 +68,7 @@ type Pool struct {
// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger.
func (p *Pool) purgePeriodically(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration)

defer func() {
heartbeat.Stop()
atomic.StoreInt32(&p.heartbeatDone, 1)
Expand All @@ -83,7 +84,6 @@ func (p *Pool) purgePeriodically(ctx context.Context) {
if p.IsClosed() {
break
}

p.lock.Lock()
expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
p.lock.Unlock()
Expand Down Expand Up @@ -115,10 +115,12 @@ func NewPool(size int, options ...Option) (*Pool, error) {
size = -1
}

if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
if !opts.DisablePurge {
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
}

if opts.Logger == nil {
Expand Down Expand Up @@ -150,8 +152,9 @@ func NewPool(size int, options ...Option) (*Pool, error) {
// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx)

if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
}
return p, nil
}

Expand Down Expand Up @@ -259,7 +262,9 @@ func (p *Pool) Reboot() {
atomic.StoreInt32(&p.heartbeatDone, 0)
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx)
if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
}
}
}

Expand Down
19 changes: 12 additions & 7 deletions pool_func.go
Expand Up @@ -134,10 +134,12 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi

opts := loadOptions(options...)

if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
if !opts.DisablePurge {
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
}

if opts.Logger == nil {
Expand Down Expand Up @@ -167,8 +169,9 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx)

if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
}
return p, nil
}

Expand Down Expand Up @@ -280,7 +283,9 @@ func (p *PoolWithFunc) Reboot() {
atomic.StoreInt32(&p.heartbeatDone, 0)
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx)
if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
}
}
}

Expand Down