Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow NewPoolWithFunc can invoke with nil argument #167

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions ants.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,26 @@ type Logger interface {
// Printf must have the same semantics as log.Printf.
Printf(format string, args ...interface{})
}
type Task interface {
Do()
}

type TaskFn func()

func (fn TaskFn) Do() {
fn()
}

// Submit submits a task to pool.
func Submit(task func()) error {
return defaultAntsPool.Submit(task)
}

// SubmitTask submits a task to pool.
func SubmitTask(task Task) error {
return defaultAntsPool.SubmitTask(task)
}

// Running returns the number of the currently running goroutines.
func Running() int {
return defaultAntsPool.Running()
Expand Down
38 changes: 38 additions & 0 deletions ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package ants
import (
"log"
"os"
"reflect"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -55,6 +56,43 @@ const (

var curMem uint64

// TestAntsPoolWithFuncWaitToGetWorker is used to test waiting to get worker.
func TestAntsPoolWithFuncNilParam(t *testing.T) {
var old1 interface{} = &struct{ a byte }{}
var old2 interface{} = &struct{ a byte }{}
assert.True(t, old1 != old2)

isStopArg := func(v interface{}) bool { return v == stopArg }

var a1 interface{} = &struct{ a byte }{}
var a2 interface{} = &struct{ a byte }{}
var a3 interface{} = &struct{ a byte }{}
assert.False(t, a1 == a2)
assert.False(t, isStopArg(a1))
assert.False(t, isStopArg(a2))
assert.False(t, isStopArg(a3))
assert.True(t, isStopArg(stopArg))

var wg sync.WaitGroup
var isNilCount int
p, _ := NewPoolWithFunc(1, func(i interface{}) {
if i == nil || reflect.ValueOf(i).IsNil() {
isNilCount++
}
wg.Done()
})
defer p.Release()

wg.Add(2)
_ = p.Invoke(nil)
_ = p.Invoke((*PoolWithFunc)(nil))

wg.Wait()

assert.Equal(t, 2, isNilCount)
t.Logf("pool with func, running workers number:%d", p.Running())
}

// TestAntsPoolWaitToGetWorker is used to test waiting to get worker.
func TestAntsPoolWaitToGetWorker(t *testing.T) {
var wg sync.WaitGroup
Expand Down
7 changes: 6 additions & 1 deletion pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func NewPool(size int, options ...Option) (*Pool, error) {
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
task: make(chan Task, workerChanCap),
}
}
if p.options.PreAlloc {
Expand All @@ -143,6 +143,11 @@ func NewPool(size int, options ...Option) (*Pool, error) {

// Submit submits a task to this pool.
func (p *Pool) Submit(task func()) error {
return p.SubmitTask(TaskFn(task))
}

// SubmitTask submits a task to this pool.
func (p *Pool) SubmitTask(task Task) error {
if p.IsClosed() {
return ErrPoolClosed
}
Expand Down
6 changes: 4 additions & 2 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type PoolWithFunc struct {
options *Options
}

var stopArg interface{} = &struct{ a byte }{}

// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger.
func (p *PoolWithFunc) purgePeriodically() {
heartbeat := time.NewTicker(p.options.ExpiryDuration)
Expand Down Expand Up @@ -95,7 +97,7 @@ func (p *PoolWithFunc) purgePeriodically() {
// may be blocking and may consume a lot of time if many workers
// are located on non-local CPUs.
for i, w := range expiredWorkers {
w.args <- nil
w.args <- stopArg
expiredWorkers[i] = nil
}

Expand Down Expand Up @@ -209,7 +211,7 @@ func (p *PoolWithFunc) Release() {
p.lock.Lock()
idleWorkers := p.workers
for _, w := range idleWorkers {
w.args <- nil
w.args <- stopArg
}
p.workers = nil
p.lock.Unlock()
Expand Down
5 changes: 3 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type goWorker struct {
pool *Pool

// task is a job should be done.
task chan func()
task chan Task

// recycleTime will be update when putting a worker back into queue.
recycleTime time.Time
Expand Down Expand Up @@ -67,7 +67,8 @@ func (w *goWorker) run() {
if f == nil {
return
}
f()

f.Do()
if ok := w.pool.revertWorker(w); !ok {
return
}
Expand Down
2 changes: 1 addition & 1 deletion worker_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (w *goWorkerWithFunc) run() {
}()

for args := range w.args {
if args == nil {
if args == stopArg {
return
}
w.pool.poolFunc(args)
Expand Down