Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submitting task supports passing in arguments #234

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 28 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,14 @@ import (

var sum int32

func myAdd(i int32) {
atomic.AddInt32(&sum, i)
fmt.Printf("run with %d\n", i)
}

func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
myAdd(n)
}

func demoFunc() {
Expand All @@ -111,7 +115,25 @@ func main() {
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")

// Use the pool with a function,
// Use the common pool, with args.
calcFunc := func(args ...interface{}) {
i := args[0].(int32)
myAdd(i)
wg.Done()
}
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = ants.SubmitWithArgs(calcFunc, int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
if sum != 499500 {
panic("the final result is wrong!!!")
}
atomic.SwapInt32(&sum, 0)

// Use the pool with a method,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
Expand All @@ -126,6 +148,9 @@ func main() {
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
if sum != 499500 {
panic("the final result is wrong!!!")
}
}
```

Expand Down
5 changes: 5 additions & 0 deletions ants.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@
return defaultAntsPool.Submit(task)
}

// Submit submits a task with arguments to pool.
func SubmitWithArgs(task func(args ...interface{}), args ...interface{}) error {
return defaultAntsPool.SubmitWithArgs(task, args...)

Check warning on line 109 in ants.go

View check run for this annotation

Codecov / codecov/patch

ants.go#L108-L109

Added lines #L108 - L109 were not covered by tests
}

// Running returns the number of the currently running goroutines.
func Running() int {
return defaultAntsPool.Running()
Expand Down
26 changes: 24 additions & 2 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ import (

var sum int32

func myAdd(i int32) {
atomic.AddInt32(&sum, i)
fmt.Printf("run with %d\n", i)
}

func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
myAdd(n)
}

func demoFunc() {
Expand All @@ -63,6 +67,24 @@ func main() {
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")

// Use the common pool, with args.
calcFunc := func(args ...interface{}) {
i := args[0].(int32)
myAdd(i)
wg.Done()
}
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = ants.SubmitWithArgs(calcFunc, int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
if sum != 499500 {
panic("the final result is wrong!!!")
}
atomic.SwapInt32(&sum, 0)

// Use the pool with a method,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
Expand Down
11 changes: 9 additions & 2 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func NewPool(size int, options ...Option) (*Pool, error) {
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
task: make(chan *goTask, workerChanCap),
}
}
if p.options.PreAlloc {
Expand Down Expand Up @@ -164,14 +164,21 @@ func NewPool(size int, options ...Option) (*Pool, error) {
// Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a Pool with ants.WithNonblocking(true).
func (p *Pool) Submit(task func()) error {
return p.SubmitWithArgs(func(args ...interface{}) {
task()
})
}

// Submit submits a task with arguments to this pool.
func (p *Pool) SubmitWithArgs(task func(args ...interface{}), args ...interface{}) error {
if p.IsClosed() {
return ErrPoolClosed
}
var w *goWorker
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
}
w.task <- task
w.task <- &goTask{task: task, args: args}
return nil
}

Expand Down
10 changes: 8 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ import (
"time"
)

// goTask is the task with a function and arguments
type goTask struct {
task func(args ...interface{})
args []interface{}
}

// goWorker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
Expand All @@ -35,7 +41,7 @@ type goWorker struct {
pool *Pool

// task is a job should be done.
task chan func()
task chan *goTask

// recycleTime will be updated when putting a worker back into queue.
recycleTime time.Time
Expand Down Expand Up @@ -67,7 +73,7 @@ func (w *goWorker) run() {
if f == nil {
return
}
f()
f.task(f.args...)
if ok := w.pool.revertWorker(w); !ok {
return
}
Expand Down