Skip to content

Commit

Permalink
opt: cache current time for workders and update it periodically
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Dec 11, 2022
1 parent 03011bc commit 846d76a
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 25 deletions.
2 changes: 2 additions & 0 deletions ants.go
Expand Up @@ -88,6 +88,8 @@ var (
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
)

const nowTimeUpdateInterval = 500 * time.Millisecond

// Logger is used for logging formatted messages.
type Logger interface {
// Printf must have the same semantics as log.Printf.
Expand Down
42 changes: 42 additions & 0 deletions ants_benchmark_test.go
Expand Up @@ -25,6 +25,7 @@ package ants
import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -144,3 +145,44 @@ func BenchmarkAntsPoolThroughput(b *testing.B) {
}
b.StopTimer()
}

func BenchmarkTimeNow(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = time.Now()
}
}

func BenchmarkTimeNowCache(b *testing.B) {
var (
now atomic.Value
offset int32
)

now.Store(time.Now())
go func() {
for range time.Tick(500 * time.Millisecond) {
now.Store(time.Now())
atomic.StoreInt32(&offset, 0)
}
}()

b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = now.Load().(time.Time).Add(time.Duration(atomic.AddInt32(&offset, 1)))
}
}

func BenchmarkTimeNowCache1(b *testing.B) {
var now atomic.Value
now.Store(time.Now())
go func() {
for range time.Tick(500 * time.Millisecond) {
now.Store(time.Now())
}
}()

b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = now.Load().(time.Time)
}
}
52 changes: 39 additions & 13 deletions ants_test.go
Expand Up @@ -322,22 +322,50 @@ func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {
_ = p1.Invoke("Oops!")
}

func TestPurge(t *testing.T) {
p, err := NewPool(10)
func TestPurgePool(t *testing.T) {
size := 500
ch := make(chan struct{})

p, err := NewPool(size)
assert.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.Release()
_ = p.Submit(demoFunc)
time.Sleep(3 * DefaultCleanIntervalTime)
assert.EqualValues(t, 0, p.Running(), "all p should be purged")
p1, err := NewPoolWithFunc(10, demoPoolFunc)

for i := 0; i < size; i++ {
j := i + 1
_ = p.Submit(func() {
<-ch
d := j % 100
time.Sleep(time.Duration(d) * time.Millisecond)
})
}
assert.Equalf(t, size, p.Running(), "pool should be full, expected: %d, but got: %d", size, p.Running())

close(ch)
time.Sleep(5 * DefaultCleanIntervalTime)
assert.Equalf(t, 0, p.Running(), "pool should be empty after purge, but got %d", p.Running())

ch = make(chan struct{})
f := func(i interface{}) {
<-ch
d := i.(int) % 100
time.Sleep(time.Duration(d) * time.Millisecond)
}

p1, err := NewPoolWithFunc(size, f)
assert.NoErrorf(t, err, "create TimingPoolWithFunc failed: %v", err)
defer p1.Release()
_ = p1.Invoke(1)
time.Sleep(3 * DefaultCleanIntervalTime)
assert.EqualValues(t, 0, p.Running(), "all p should be purged")

for i := 0; i < size; i++ {
_ = p1.Invoke(i)
}
assert.Equalf(t, size, p1.Running(), "pool should be full, expected: %d, but got: %d", size, p1.Running())

close(ch)
time.Sleep(5 * DefaultCleanIntervalTime)
assert.Equalf(t, 0, p1.Running(), "pool should be empty after purge, but got %d", p1.Running())
}

func TestPurgePreMalloc(t *testing.T) {
func TestPurgePreMallocPool(t *testing.T) {
p, err := NewPool(10, WithPreAlloc(true))
assert.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.Release()
Expand Down Expand Up @@ -547,9 +575,7 @@ func TestInfinitePool(t *testing.T) {
}
var err error
_, err = NewPool(-1, WithPreAlloc(true))
if err != ErrInvalidPreAllocSize {
t.Errorf("expect ErrInvalidPreAllocSize but got %v", err)
}
assert.EqualErrorf(t, err, ErrInvalidPreAllocSize.Error(), "")
}

func testPoolWithDisablePurge(t *testing.T, p *Pool, numWorker int, waitForPurge time.Duration) {
Expand Down
32 changes: 26 additions & 6 deletions pool.go
Expand Up @@ -62,11 +62,13 @@ type Pool struct {
heartbeatDone int32
stopHeartbeat context.CancelFunc

now atomic.Value

options *Options
}

// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger.
func (p *Pool) purgePeriodically(ctx context.Context) {
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *Pool) purgeStaleWorkers(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration)

defer func() {
Expand All @@ -76,9 +78,9 @@ func (p *Pool) purgePeriodically(ctx context.Context) {

for {
select {
case <-heartbeat.C:
case <-ctx.Done():
return
case <-heartbeat.C:
}

if p.IsClosed() {
Expand Down Expand Up @@ -108,6 +110,20 @@ func (p *Pool) purgePeriodically(ctx context.Context) {
}
}

// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *Pool) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer ticker.Stop()

for range ticker.C {
p.now.Store(time.Now())
}
}

func (p *Pool) nowTime() time.Time {
return p.now.Load().(time.Time)
}

// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {
opts := loadOptions(options...)
Expand Down Expand Up @@ -154,8 +170,12 @@ func NewPool(size int, options ...Option) (*Pool, error) {
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
go p.purgeStaleWorkers(ctx)
}

p.now.Store(time.Now())
go p.ticktock()

return p, nil
}

Expand Down Expand Up @@ -264,7 +284,7 @@ func (p *Pool) Reboot() {
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
go p.purgeStaleWorkers(ctx)
}
}
}
Expand Down Expand Up @@ -340,7 +360,7 @@ func (p *Pool) revertWorker(worker *goWorker) bool {
p.cond.Broadcast()
return false
}
worker.recycleTime = time.Now()
worker.recycleTime = p.nowTime()
p.lock.Lock()

// To avoid memory leaks, add a double check in the lock scope.
Expand Down
32 changes: 26 additions & 6 deletions pool_func.go
Expand Up @@ -64,11 +64,13 @@ type PoolWithFunc struct {
heartbeatDone int32
stopHeartbeat context.CancelFunc

now atomic.Value

options *Options
}

// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger.
func (p *PoolWithFunc) purgePeriodically(ctx context.Context) {
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer func() {
heartbeat.Stop()
Expand All @@ -78,9 +80,9 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) {
var expiredWorkers []*goWorkerWithFunc
for {
select {
case <-heartbeat.C:
case <-ctx.Done():
return
case <-heartbeat.C:
}

if p.IsClosed() {
Expand Down Expand Up @@ -123,6 +125,20 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) {
}
}

// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *PoolWithFunc) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer ticker.Stop()

for range ticker.C {
p.now.Store(time.Now())
}
}

func (p *PoolWithFunc) nowTime() time.Time {
return p.now.Load().(time.Time)
}

// NewPoolWithFunc generates an instance of ants pool with a specific function.
func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) {
if size <= 0 {
Expand Down Expand Up @@ -171,8 +187,12 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
go p.purgeStaleWorkers(ctx)
}

p.now.Store(time.Now())
go p.ticktock()

return p, nil
}

Expand Down Expand Up @@ -285,7 +305,7 @@ func (p *PoolWithFunc) Reboot() {
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
go p.purgeStaleWorkers(ctx)
}
}
}
Expand Down Expand Up @@ -368,7 +388,7 @@ func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool {
p.cond.Broadcast()
return false
}
worker.recycleTime = time.Now()
worker.recycleTime = p.nowTime()
p.lock.Lock()

// To avoid memory leaks, add a double check in the lock scope.
Expand Down

0 comments on commit 846d76a

Please sign in to comment.