Skip to content

Commit

Permalink
Merge pull request #28 from alitto/bug/purge-after-channel-closed-2
Browse files Browse the repository at this point in the history
Allow concurrent calls to StopAndWait & other fixes
  • Loading branch information
alitto committed Mar 12, 2022
2 parents 23a42e4 + b52e832 commit 286125e
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 76 deletions.
2 changes: 1 addition & 1 deletion benchmark/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/alitto/pond/benchmark
go 1.17

require (
github.com/alitto/pond v1.6.1
github.com/alitto/pond v1.7.0
github.com/gammazero/workerpool v1.1.2
github.com/panjf2000/ants/v2 v2.4.7
)
Expand Down
2 changes: 1 addition & 1 deletion examples/dynamic_size/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/alitto/pond/examples/dynamic_size
go 1.17

require (
github.com/alitto/pond v1.6.1
github.com/alitto/pond v1.7.0
)

replace github.com/alitto/pond => ../../
2 changes: 1 addition & 1 deletion examples/fixed_size/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/alitto/pond/examples/fixed_size
go 1.17

require (
github.com/alitto/pond v1.6.1
github.com/alitto/pond v1.7.0
)

replace github.com/alitto/pond => ../../
2 changes: 1 addition & 1 deletion examples/pool_context/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ module github.com/alitto/pond/examples/pool_context

go 1.17

require github.com/alitto/pond v1.6.1
require github.com/alitto/pond v1.7.0

replace github.com/alitto/pond => ../../
2 changes: 1 addition & 1 deletion examples/prometheus/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/alitto/pond/examples/fixed_size
go 1.17

require (
github.com/alitto/pond v1.6.1
github.com/alitto/pond v1.7.0
github.com/prometheus/client_golang v1.9.0
)

Expand Down
2 changes: 1 addition & 1 deletion examples/task_group/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/alitto/pond/examples/task_group
go 1.17

require (
github.com/alitto/pond v1.6.1
github.com/alitto/pond v1.7.0
)

replace github.com/alitto/pond => ../../
131 changes: 68 additions & 63 deletions pond.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ type WorkerPool struct {
failedTaskCount uint64
// Private properties
tasks chan func()
tasksCloseOnce sync.Once
workersWaitGroup sync.WaitGroup
tasksWaitGroup sync.WaitGroup
purgerDoneChan chan struct{}
mutex sync.Mutex
stopped bool
stopped int32
}

// New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers).
Expand Down Expand Up @@ -139,7 +139,7 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
pool.tasks = make(chan func(), pool.maxCapacity)

// Start purger goroutine
pool.purgerDoneChan = make(chan struct{})
pool.workersWaitGroup.Add(1)
go pool.purge()

// Start minWorkers workers
Expand Down Expand Up @@ -212,7 +212,7 @@ func (p *WorkerPool) CompletedTasks() uint64 {

// Stopped returns true if the pool has been stopped and is no longer accepting tasks, and false otherwise.
func (p *WorkerPool) Stopped() bool {
return p.stopped
return atomic.LoadInt32(&p.stopped) == 1
}

// Submit sends a task to this worker pool for execution. If the queue is full,
Expand All @@ -230,15 +230,15 @@ func (p *WorkerPool) TrySubmit(task func()) bool {

func (p *WorkerPool) submit(task func(), mustSubmit bool) (submitted bool) {
if task == nil {
return false
return
}

if p.Stopped() {
// Pool is stopped and caller must submit the task
if mustSubmit {
panic(ErrSubmitOnStoppedPool)
}
return false
return
}

// Increment submitted and waiting task counters as soon as we receive a task
Expand All @@ -255,35 +255,19 @@ func (p *WorkerPool) submit(task func(), mustSubmit bool) (submitted bool) {
}
}()

runningWorkerCount := p.RunningWorkers()

// Attempt to dispatch to an idle worker without blocking
if runningWorkerCount > 0 && p.IdleWorkers() > 0 {
select {
case p.tasks <- task:
submitted = true
return
default:
// No idle worker available, continue
}
}

// Start a worker as long as we haven't reached the limit
if runningWorkerCount < p.maxWorkers {
if ok := p.maybeStartWorker(task); ok {
submitted = true
return
}
if submitted = p.maybeStartWorker(task); submitted {
return
}

if !mustSubmit {
// Attempt to dispatch to an idle worker without blocking
select {
case p.tasks <- task:
submitted = true
return
default:
// Channel is full and can't wait for an idle worker, so need to exit
submitted = false
return
}
}
Expand Down Expand Up @@ -330,26 +314,27 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) {
})
}

// Stop causes this pool to stop accepting new tasks and signals all workers to stop processing new tasks.
// Tasks being processed by workers will continue until completion unless the process is terminated.
// Stop causes this pool to stop accepting new tasks and signals all workers to exit.
// Tasks being executed by workers will continue until completion (unless the process is terminated).
// Tasks in the queue will not be executed.
func (p *WorkerPool) Stop() {
go p.stop()
go p.stop(false)
}

// StopAndWait causes this pool to stop accepting new tasks and then waits for all tasks in the queue
// to complete before returning.
func (p *WorkerPool) StopAndWait() {
p.stop()
p.stop(true)
}

// StopAndWaitFor stops this pool and waits for all tasks in the queue to complete before returning
// or until the given deadline is reached, whichever comes first.
// StopAndWaitFor stops this pool and waits until either all tasks in the queue are completed
// or the given deadline is reached, whichever comes first.
func (p *WorkerPool) StopAndWaitFor(deadline time.Duration) {

// Launch goroutine to detect when worker pool has stopped gracefully
workersDone := make(chan struct{})
go func() {
p.stop()
p.stop(true)
workersDone <- struct{}{}
}()

Expand All @@ -363,29 +348,30 @@ func (p *WorkerPool) StopAndWaitFor(deadline time.Duration) {
}
}

func (p *WorkerPool) stop() {
func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) {
// Mark pool as stopped
p.stopped = true
atomic.StoreInt32(&p.stopped, 1)

// Wait for all queued tasks to complete
p.tasksWaitGroup.Wait()
if waitForQueuedTasksToComplete {
// Wait for all queued tasks to complete
p.tasksWaitGroup.Wait()
}

// Terminate all workers & purger goroutine
p.contextCancel()

// Wait for all workers to exit
// Wait for all workers & purger goroutine to exit
p.workersWaitGroup.Wait()

// Wait purger goroutine to exit
<-p.purgerDoneChan

// close tasks channel
close(p.tasks)
// close tasks channel (only once, in case multiple concurrent calls to StopAndWait are made)
p.tasksCloseOnce.Do(func() {
close(p.tasks)
})
}

// purge represents the work done by the purger goroutine
func (p *WorkerPool) purge() {
defer func() { p.purgerDoneChan <- struct{}{} }()
defer p.workersWaitGroup.Done()

idleTicker := time.NewTicker(p.idleTimeout)
defer idleTicker.Stop()
Expand All @@ -409,15 +395,20 @@ func (p *WorkerPool) stopIdleWorker() {
}
}

// startWorkers creates new worker goroutines to run the given tasks
// maybeStartWorker attempts to create a new worker goroutine to run the given task.
// If the worker pool has reached the maximum number of workers or there are idle workers,
// it will not create a new worker.
func (p *WorkerPool) maybeStartWorker(firstTask func()) bool {

// Attempt to increment worker count
if ok := p.incrementWorkerCount(); !ok {
return false
}

// Launch worker
if firstTask == nil {
atomic.AddInt32(&p.idleWorkerCount, 1)
}

// Launch worker goroutine
go worker(p.context, firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask)

return true
Expand Down Expand Up @@ -446,33 +437,46 @@ func (p *WorkerPool) executeTask(task func()) {
atomic.AddUint64(&p.successfulTaskCount, 1)
}

func (p *WorkerPool) incrementWorkerCount() bool {
func (p *WorkerPool) incrementWorkerCount() (incremented bool) {

// Attempt to increment worker count
p.mutex.Lock()
runningWorkerCount := p.RunningWorkers()
// Execute the resizing strategy to determine if we can create more workers
if !p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers) || runningWorkerCount >= p.maxWorkers {
p.mutex.Unlock()
return false

// Reached max workers, do not create a new one
if runningWorkerCount >= p.maxWorkers {
return
}
atomic.AddInt32(&p.workerCount, 1)
p.mutex.Unlock()

// Increment waiting group semaphore
p.workersWaitGroup.Add(1)
// Idle workers available, do not create a new one
if runningWorkerCount >= p.minWorkers && runningWorkerCount > 0 && p.IdleWorkers() > 0 {
return
}

return true
p.mutex.Lock()
defer p.mutex.Unlock()

// Execute the resizing strategy to determine if we can create more workers
incremented = p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers)

if incremented {
// Increment worker count
atomic.AddInt32(&p.workerCount, 1)

// Increment wait group
p.workersWaitGroup.Add(1)
}

return
}

func (p *WorkerPool) decrementWorkerCount() {

// Decrement worker count
p.mutex.Lock()
defer p.mutex.Unlock()

// Decrement worker count
atomic.AddInt32(&p.workerCount, -1)
p.mutex.Unlock()

// Decrement waiting group semaphore
// Decrement wait group
p.workersWaitGroup.Done()
}

Expand All @@ -497,9 +501,10 @@ func worker(context context.Context, firstTask func(), tasks <-chan func(), idle
// We have received a task, execute it
if firstTask != nil {
taskExecutor(firstTask)

// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)
}
// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)

for {
select {
Expand Down
53 changes: 53 additions & 0 deletions pond_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pond_test
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -568,3 +569,55 @@ func TestSubmitWithContext(t *testing.T) {
assertEqual(t, int32(1), atomic.LoadInt32(&taskCount))
assertEqual(t, int32(0), atomic.LoadInt32(&doneCount))
}

func TestConcurrentStopAndWait(t *testing.T) {

pool := pond.New(1, 5)

// Submit tasks
var doneCount int32
for i := 0; i < 10; i++ {
pool.Submit(func() {
time.Sleep(1 * time.Millisecond)
atomic.AddInt32(&doneCount, 1)
})
}

wg := sync.WaitGroup{}

for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()

pool.StopAndWait()
assertEqual(t, int32(10), atomic.LoadInt32(&doneCount))
}()
}

wg.Wait()
}

func TestSubmitToIdleWorker(t *testing.T) {

pool := pond.New(6, 0, pond.MinWorkers(3))

assertEqual(t, 3, pool.RunningWorkers())

// Submit task
var doneCount int32
for i := 0; i < 3; i++ {
pool.Submit(func() {
time.Sleep(1 * time.Millisecond)
atomic.AddInt32(&doneCount, 1)
})
}

// Verify no new workers were started
assertEqual(t, 3, pool.RunningWorkers())

// Wait until all submitted tasks complete
pool.StopAndWait()

assertEqual(t, int32(3), atomic.LoadInt32(&doneCount))
}
2 changes: 1 addition & 1 deletion pond_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ func TestPurgeAfterPoolStopped(t *testing.T) {
assertEqual(t, 1, pool.RunningWorkers())

// Simulate purger goroutine attempting to stop a worker after tasks channel is closed
pool.stopped = true
atomic.StoreInt32(&pool.stopped, 1)
pool.stopIdleWorker()
}

0 comments on commit 286125e

Please sign in to comment.