Skip to content

Commit

Permalink
feat: goroutine exits immediately
Browse files Browse the repository at this point in the history
  • Loading branch information
exfly committed Jul 25, 2021
1 parent 013e846 commit 12dd0ea
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 52 deletions.
26 changes: 26 additions & 0 deletions ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)

const (
Expand All @@ -57,6 +58,8 @@ var curMem uint64

// TestAntsPoolWaitToGetWorker is used to test waiting to get worker.
func TestAntsPoolWaitToGetWorker(t *testing.T) {
defer goleak.VerifyNone(t)

var wg sync.WaitGroup
p, _ := NewPool(AntsSize)
defer p.Release()
Expand All @@ -77,6 +80,7 @@ func TestAntsPoolWaitToGetWorker(t *testing.T) {
}

func TestAntsPoolWaitToGetWorkerPreMalloc(t *testing.T) {
defer goleak.VerifyNone(t)
var wg sync.WaitGroup
p, _ := NewPool(AntsSize, WithPreAlloc(true))
defer p.Release()
Expand All @@ -98,6 +102,7 @@ func TestAntsPoolWaitToGetWorkerPreMalloc(t *testing.T) {

// TestAntsPoolWithFuncWaitToGetWorker is used to test waiting to get worker.
func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
defer goleak.VerifyNone(t)
var wg sync.WaitGroup
p, _ := NewPoolWithFunc(AntsSize, func(i interface{}) {
demoPoolFunc(i)
Expand All @@ -118,6 +123,7 @@ func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
}

func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) {
defer goleak.VerifyNone(t)
var wg sync.WaitGroup
p, _ := NewPoolWithFunc(AntsSize, func(i interface{}) {
demoPoolFunc(i)
Expand All @@ -139,6 +145,7 @@ func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) {

// TestAntsPoolGetWorkerFromCache is used to test getting worker from sync.Pool.
func TestAntsPoolGetWorkerFromCache(t *testing.T) {
defer goleak.VerifyNone(t)
p, _ := NewPool(TestSize)
defer p.Release()

Expand All @@ -156,6 +163,7 @@ func TestAntsPoolGetWorkerFromCache(t *testing.T) {

// TestAntsPoolWithFuncGetWorkerFromCache is used to test getting worker from sync.Pool.
func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) {
defer goleak.VerifyNone(t)
dur := 10
p, _ := NewPoolWithFunc(TestSize, demoPoolFunc)
defer p.Release()
Expand All @@ -173,6 +181,7 @@ func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) {
}

func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) {
defer goleak.VerifyNone(t)
dur := 10
p, _ := NewPoolWithFunc(TestSize, demoPoolFunc, WithPreAlloc(true))
defer p.Release()
Expand All @@ -194,6 +203,7 @@ func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) {
//-------------------------------------------------------------------------------------------

func TestNoPool(t *testing.T) {
defer goleak.VerifyNone(t)
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
Expand All @@ -211,6 +221,7 @@ func TestNoPool(t *testing.T) {
}

func TestAntsPool(t *testing.T) {
defer goleak.VerifyNone(t)
defer Release()
var wg sync.WaitGroup
for i := 0; i < n; i++ {
Expand All @@ -236,6 +247,7 @@ func TestAntsPool(t *testing.T) {
//-------------------------------------------------------------------------------------------

func TestPanicHandler(t *testing.T) {
defer goleak.VerifyNone(t)
var panicCounter int64
var wg sync.WaitGroup
p0, err := NewPool(10, WithPanicHandler(func(p interface{}) {
Expand Down Expand Up @@ -268,6 +280,7 @@ func TestPanicHandler(t *testing.T) {
}

func TestPanicHandlerPreMalloc(t *testing.T) {
defer goleak.VerifyNone(t)
var panicCounter int64
var wg sync.WaitGroup
p0, err := NewPool(10, WithPreAlloc(true), WithPanicHandler(func(p interface{}) {
Expand Down Expand Up @@ -300,6 +313,7 @@ func TestPanicHandlerPreMalloc(t *testing.T) {
}

func TestPoolPanicWithoutHandler(t *testing.T) {
defer goleak.VerifyNone(t)
p0, err := NewPool(10)
assert.NoErrorf(t, err, "create new pool failed: %v", err)
defer p0.Release()
Expand All @@ -316,6 +330,7 @@ func TestPoolPanicWithoutHandler(t *testing.T) {
}

func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {
defer goleak.VerifyNone(t)
p0, err := NewPool(10, WithPreAlloc(true))
assert.NoErrorf(t, err, "create new pool failed: %v", err)
defer p0.Release()
Expand All @@ -334,6 +349,7 @@ func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {
}

func TestPurge(t *testing.T) {
defer goleak.VerifyNone(t)
p, err := NewPool(10)
assert.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.Release()
Expand All @@ -349,6 +365,7 @@ func TestPurge(t *testing.T) {
}

func TestPurgePreMalloc(t *testing.T) {
defer goleak.VerifyNone(t)
p, err := NewPool(10, WithPreAlloc(true))
assert.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.Release()
Expand All @@ -364,6 +381,7 @@ func TestPurgePreMalloc(t *testing.T) {
}

func TestNonblockingSubmit(t *testing.T) {
defer goleak.VerifyNone(t)
poolSize := 10
p, err := NewPool(poolSize, WithNonblocking(true))
assert.NoErrorf(t, err, "create TimingPool failed: %v", err)
Expand All @@ -388,6 +406,7 @@ func TestNonblockingSubmit(t *testing.T) {
}

func TestMaxBlockingSubmit(t *testing.T) {
defer goleak.VerifyNone(t)
poolSize := 10
p, err := NewPool(poolSize, WithMaxBlockingTasks(1))
assert.NoErrorf(t, err, "create TimingPool failed: %v", err)
Expand Down Expand Up @@ -426,6 +445,7 @@ func TestMaxBlockingSubmit(t *testing.T) {
}

func TestNonblockingSubmitWithFunc(t *testing.T) {
defer goleak.VerifyNone(t)
poolSize := 10
var wg sync.WaitGroup
p, err := NewPoolWithFunc(poolSize, func(i interface{}) {
Expand All @@ -450,6 +470,7 @@ func TestNonblockingSubmitWithFunc(t *testing.T) {
}

func TestMaxBlockingSubmitWithFunc(t *testing.T) {
defer goleak.VerifyNone(t)
poolSize := 10
p, err := NewPoolWithFunc(poolSize, longRunningPoolFunc, WithMaxBlockingTasks(1))
assert.NoError(t, err, "create TimingPool failed: %v", err)
Expand Down Expand Up @@ -485,6 +506,7 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) {
}

func TestRebootDefaultPool(t *testing.T) {
defer goleak.VerifyNone(t)
defer Release()
Reboot()
var wg sync.WaitGroup
Expand All @@ -503,6 +525,7 @@ func TestRebootDefaultPool(t *testing.T) {
}

func TestRebootNewPool(t *testing.T) {
defer goleak.VerifyNone(t)
var wg sync.WaitGroup
p, err := NewPool(10)
assert.NoErrorf(t, err, "create Pool failed: %v", err)
Expand Down Expand Up @@ -538,6 +561,7 @@ func TestRebootNewPool(t *testing.T) {
}

func TestInfinitePool(t *testing.T) {
defer goleak.VerifyNone(t)
c := make(chan struct{})
p, _ := NewPool(-1)
_ = p.Submit(func() {
Expand All @@ -564,6 +588,7 @@ func TestInfinitePool(t *testing.T) {
}

func TestInfinitePoolWithFunc(t *testing.T) {
defer goleak.VerifyNone(t)
c := make(chan struct{})
p, _ := NewPoolWithFunc(-1, func(i interface{}) {
demoPoolFunc(i)
Expand Down Expand Up @@ -591,6 +616,7 @@ func TestInfinitePoolWithFunc(t *testing.T) {
}

func TestRestCodeCoverage(t *testing.T) {
defer goleak.VerifyNone(t)
_, err := NewPool(-1, WithExpiryDuration(-1))
t.Log(err)
_, err = NewPool(1, WithExpiryDuration(-1))
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ go 1.14
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/stretchr/testify v1.4.0
go.uber.org/goleak v1.1.10 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/tools v0.1.5 // indirect
gopkg.in/yaml.v2 v2.2.7 // indirect
)
38 changes: 38 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,51 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
51 changes: 31 additions & 20 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,36 +59,44 @@ type Pool struct {
blockingNum int

options *Options

done chan struct{}
}

// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger.
func (p *Pool) purgePeriodically() {
heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop()

for range heartbeat.C {
if p.IsClosed() {
break
}
loop:
for {
select {
case <-p.done:
return
case <-heartbeat.C:
if p.IsClosed() {
break loop
}

p.lock.Lock()
expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
p.lock.Unlock()
p.lock.Lock()
expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
p.lock.Unlock()

// Notify obsolete workers to stop.
// This notification must be outside the p.lock, since w.task
// may be blocking and may consume a lot of time if many workers
// are located on non-local CPUs.
for i := range expiredWorkers {
expiredWorkers[i].task <- nil
expiredWorkers[i] = nil
}
// Notify obsolete workers to stop.
// This notification must be outside the p.lock, since w.task
// may be blocking and may consume a lot of time if many workers
// are located on non-local CPUs.
for i := range expiredWorkers {
expiredWorkers[i].task <- nil
expiredWorkers[i] = nil
}

// There might be a situation that all workers have been cleaned up(no any worker is running)
// while some invokers still get stuck in "p.cond.Wait()",
// then it ought to wake all those invokers.
if p.Running() == 0 {
p.cond.Broadcast()
// There might be a situation that all workers have been cleaned up(no any worker is running)
// while some invokers still get stuck in "p.cond.Wait()",
// then it ought to wake all those invokers.
if p.Running() == 0 {
p.cond.Broadcast()
}
}
}
}
Expand All @@ -115,6 +123,7 @@ func NewPool(size int, options ...Option) (*Pool, error) {
capacity: int32(size),
lock: internal.NewSpinLock(),
options: opts,
done: make(chan struct{}),
}
p.workerCache.New = func() interface{} {
return &goWorker{
Expand Down Expand Up @@ -191,6 +200,7 @@ func (p *Pool) Release() {
atomic.StoreInt32(&p.state, CLOSED)
p.lock.Lock()
p.workers.reset()
close(p.done)
p.lock.Unlock()
// There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent
// those callers blocking infinitely.
Expand All @@ -200,6 +210,7 @@ func (p *Pool) Release() {
// Reboot reboots a closed pool.
func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
p.done = make(chan struct{})
go p.purgePeriodically()
}
}
Expand Down

0 comments on commit 12dd0ea

Please sign in to comment.