Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for Run(ctx) interface #246

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ func demoFunc() {
fmt.Println("Hello World!")
}

type run struct {
i int32
wg *sync.WaitGroup
}

func (r *run) Run(ctx context.Context) {
myFunc(r.i)
r.wg.Done()
}

func main() {
defer ants.Release()

Expand Down Expand Up @@ -126,6 +136,19 @@ func main() {
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)

// Use the pool with runner,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
pr, _ := ants.NewPoolWithRunner(10)
defer pr.Release()
// Submit tasks one by one.
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = pr.Invoke(&run{i:i,wg:&wg})
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
}
```

Expand Down
215 changes: 215 additions & 0 deletions ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package ants

import (
"context"
"log"
"os"
"runtime"
Expand Down Expand Up @@ -117,6 +118,53 @@ func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
t.Logf("memory usage:%d MB", curMem)
}

type run struct {
i interface{}
wg *sync.WaitGroup
c chan struct{}
log *testing.T
}

func (r *run) Run(ctx context.Context) {
demoPoolFunc(r.i)
if r.wg != nil {
r.wg.Done()
}
if r.c != nil {
<-r.c
}
if r.log != nil {
r.log.Log("do task", r.i)
time.Sleep(1 * time.Second)
}

return
}

var _ Runner = &run{}

// TestAntsPoolWithRunnerWaitToGetWorker is used to test waiting to get worker.
func TestAntsPoolWithRunnerWaitToGetWorker(t *testing.T) {
var wg sync.WaitGroup
r := &run{
i: 0,
wg: &wg,
}
p, _ := NewPoolWithRunner(AntsSize)
defer p.Release()

for i := 0; i < n; i++ {
wg.Add(1)
_ = p.Invoke(r)
}
wg.Wait()
t.Logf("pool with runner, running workers number:%d", p.Running())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem)
}

func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) {
var wg sync.WaitGroup
p, _ := NewPoolWithFunc(AntsSize, func(i interface{}) {
Expand All @@ -137,6 +185,27 @@ func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) {
t.Logf("memory usage:%d MB", curMem)
}

func TestAntsPoolWithRunnerWaitToGetWorkerPreMalloc(t *testing.T) {
var wg sync.WaitGroup
r := &run{
i: 0,
wg: &wg,
}
p, _ := NewPoolWithRunner(AntsSize, WithPreAlloc(true))
defer p.Release()

for i := 0; i < n; i++ {
wg.Add(1)
_ = p.Invoke(r)
}
wg.Wait()
t.Logf("pool with runner, running workers number:%d", p.Running())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem)
}

// TestAntsPoolGetWorkerFromCache is used to test getting worker from sync.Pool.
func TestAntsPoolGetWorkerFromCache(t *testing.T) {
p, _ := NewPool(TestSize)
Expand Down Expand Up @@ -172,6 +241,26 @@ func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) {
t.Logf("memory usage:%d MB", curMem)
}

// TestAntsPoolWithFuncGetWorkerFromCache is used to test getting worker from sync.Pool.
func TestAntsPoolWithRunnerGetWorkerFromCache(t *testing.T) {
r := &run{
i: 1,
}
p, _ := NewPoolWithRunner(TestSize)
defer p.Release()

for i := 0; i < AntsSize; i++ {
_ = p.Invoke(r)
}
time.Sleep(2 * DefaultCleanIntervalTime)
_ = p.Invoke(r)
t.Logf("pool with runner, running workers number:%d", p.Running())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem)
}

func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) {
dur := 10
p, _ := NewPoolWithFunc(TestSize, demoPoolFunc, WithPreAlloc(true))
Expand All @@ -189,6 +278,25 @@ func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) {
t.Logf("memory usage:%d MB", curMem)
}

func TestAntsPoolWithRunnerGetWorkerFromCachePreMalloc(t *testing.T) {
r := &run{
i: 10,
}
p, _ := NewPoolWithRunner(TestSize, WithPreAlloc(true))
defer p.Release()

for i := 0; i < AntsSize; i++ {
_ = p.Invoke(r)
}
time.Sleep(2 * DefaultCleanIntervalTime)
_ = p.Invoke(r)
t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem)
}

//-------------------------------------------------------------------------------------------
// Contrast between goroutines without a pool and goroutines with ants pool.
//-------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -267,6 +375,15 @@ func TestPanicHandler(t *testing.T) {
assert.EqualValues(t, 0, p1.Running(), "pool should be empty after panic")
}

type runPanic struct {
}

func (r *runPanic) Run(ctx context.Context) {
panic("Oops!")
}

var _ Runner = &runPanic{}

func TestPanicHandlerPreMalloc(t *testing.T) {
var panicCounter int64
var wg sync.WaitGroup
Expand Down Expand Up @@ -297,6 +414,19 @@ func TestPanicHandlerPreMalloc(t *testing.T) {
c = atomic.LoadInt64(&panicCounter)
assert.EqualValuesf(t, 2, c, "panic handler didn't work, panicCounter: %d", c)
assert.EqualValues(t, 0, p1.Running(), "pool should be empty after panic")
r := &runPanic{}
p2, err := NewPoolWithRunner(10, WithPanicHandler(func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
}))
assert.NoErrorf(t, err, "create new pool with runner failed: %v", err)
defer p2.Release()
wg.Add(1)
_ = p2.Invoke(r)
wg.Wait()
c = atomic.LoadInt64(&panicCounter)
assert.EqualValuesf(t, 3, c, "panic handler didn't work, panicCounter: %d", c)
assert.EqualValues(t, 0, p1.Running(), "pool should be empty after panic")
}

func TestPoolPanicWithoutHandler(t *testing.T) {
Expand All @@ -313,6 +443,11 @@ func TestPoolPanicWithoutHandler(t *testing.T) {
assert.NoErrorf(t, err, "create new pool with func failed: %v", err)
defer p1.Release()
_ = p1.Invoke("Oops!")
r := &runPanic{}
p2, err := NewPoolWithRunner(10)
assert.NoErrorf(t, err, "create new pool with runner failed: %v", err)
defer p2.Release()
_ = p2.Invoke(r)
}

func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {
Expand All @@ -331,6 +466,12 @@ func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {

defer p1.Release()
_ = p1.Invoke("Oops!")

r := &runPanic{}
p2, err := NewPoolWithRunner(10)
assert.NoErrorf(t, err, "create new pool with runner failed: %v", err)
defer p2.Release()
_ = p2.Invoke(r)
}

func TestPurge(t *testing.T) {
Expand All @@ -346,6 +487,14 @@ func TestPurge(t *testing.T) {
_ = p1.Invoke(1)
time.Sleep(3 * DefaultCleanIntervalTime)
assert.EqualValues(t, 0, p.Running(), "all p should be purged")

r := &run{i: 10}
p2, err := NewPoolWithRunner(10)
assert.NoErrorf(t, err, "create TimingPoolWithRunner failed: %v", err)
defer p2.Release()
_ = p2.Invoke(r)
time.Sleep(3 * DefaultCleanIntervalTime)
assert.EqualValues(t, 0, p.Running(), "all p should be purged")
}

func TestPurgePreMalloc(t *testing.T) {
Expand All @@ -361,6 +510,13 @@ func TestPurgePreMalloc(t *testing.T) {
_ = p1.Invoke(1)
time.Sleep(3 * DefaultCleanIntervalTime)
assert.EqualValues(t, 0, p.Running(), "all p should be purged")
r := &run{i: 10}
p2, err := NewPoolWithRunner(10, WithPreAlloc(true))
assert.NoErrorf(t, err, "create TimingPoolWithRunner failed: %v", err)
defer p2.Release()
_ = p2.Invoke(r)
time.Sleep(3 * DefaultCleanIntervalTime)
assert.EqualValues(t, 0, p.Running(), "all p should be purged")
}

func TestNonblockingSubmit(t *testing.T) {
Expand Down Expand Up @@ -590,6 +746,30 @@ func TestInfinitePoolWithFunc(t *testing.T) {
}
}

func TestInfinitePoolWithRunner(t *testing.T) {
r := &run{i: 10, c: make(chan struct{})}
p, _ := NewPoolWithRunner(-1)
_ = p.Invoke(r)
_ = p.Invoke(r)
r.c <- struct{}{}
r.c <- struct{}{}
if n := p.Running(); n != 2 {
t.Errorf("expect 2 workers running, but got %d", n)
}
if n := p.Free(); n != -1 {
t.Errorf("expect -1 of free workers by unlimited pool, but got %d", n)
}
p.Tune(10)
if capacity := p.Cap(); capacity != -1 {
t.Fatalf("expect capacity: -1 but got %d", capacity)
}
var err error
_, err = NewPoolWithRunner(-1, WithPreAlloc(true))
if err != ErrInvalidPreAllocSize {
t.Errorf("expect ErrInvalidPreAllocSize but got %v", err)
}
}

func TestReleaseWhenRunningPool(t *testing.T) {
var wg sync.WaitGroup
p, _ := NewPool(1)
Expand Down Expand Up @@ -665,6 +845,41 @@ func TestReleaseWhenRunningPoolWithFunc(t *testing.T) {
wg.Wait()
}

func TestReleaseWhenRunningPoolWithRunner(t *testing.T) {
var wg sync.WaitGroup

p, _ := NewPoolWithRunner(1)
wg.Add(2)
go func() {
t.Log("start aaa")
defer func() {
wg.Done()
t.Log("stop aaa")
}()
for i := 0; i < 30; i++ {
r := &run{i: i, log: t}
_ = p.Invoke(r)
}
}()

go func() {
t.Log("start bbb")
defer func() {
wg.Done()
t.Log("stop bbb")
}()
for i := 100; i < 130; i++ {
r := &run{i: i, log: t}
_ = p.Invoke(r)
}
}()

time.Sleep(3 * time.Second)
p.Release()
t.Log("wait for all goroutines to exit...")
wg.Wait()
}

func TestRestCodeCoverage(t *testing.T) {
_, err := NewPool(-1, WithExpiryDuration(-1))
t.Log(err)
Expand Down