From 3968f4003c97cbe8f263aae3f543145cc02a25ff Mon Sep 17 00:00:00 2001 From: Alejandro Durante Date: Mon, 9 May 2022 19:06:05 -0300 Subject: [PATCH] Group context --- .github/workflows/main.yml | 2 +- README.md | 63 +++++++++++++- examples/dynamic_size/go.mod | 4 +- examples/fixed_size/go.mod | 4 +- examples/group_context/go.mod | 9 ++ examples/group_context/group_context.go | 46 ++++++++++ examples/pool_context/go.mod | 4 +- examples/prometheus/go.mod | 4 +- examples/task_group/go.mod | 4 +- go.mod | 2 +- group.go | 90 ++++++++++++++++++++ group_blackbox_test.go | 107 ++++++++++++++++++++++++ pond.go | 37 ++++---- pond_blackbox_test.go | 28 ------- 14 files changed, 338 insertions(+), 66 deletions(-) create mode 100644 examples/group_context/go.mod create mode 100644 examples/group_context/group_context.go create mode 100644 group.go create mode 100644 group_blackbox_test.go diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index cb0a6ec..1e81163 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -10,7 +10,7 @@ jobs: name: Test strategy: matrix: - go-version: [1.15.x, 1.16.x, 1.17.x] + go-version: [1.15.x, 1.16.x, 1.17.x, 1.18.x] os: [ubuntu-latest, macos-latest, windows-latest] runs-on: ${{ matrix.os }} steps: diff --git a/README.md b/README.md index 7f24e91..1ccca42 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,8 @@ Some common scenarios include: - Submitting tasks to a pool in a fire-and-forget fashion - Submitting tasks to a pool and waiting for them to complete - Submitting tasks to a pool with a deadline - - Submitting a group of related tasks and waiting for them to complete + - Submitting a group of tasks and waiting for them to complete + - Submitting a group of tasks associated to a Context - Getting the number of running workers (goroutines) - Stopping a worker pool - Task panics are handled gracefully (configurable panic handler) @@ -104,7 +105,7 @@ func main() { } ``` -### Submitting groups of related tasks +### Submitting a group of tasks ``` go package main @@ -124,7 +125,7 @@ func main() { // Create a task group group := pool.Group() - // Submit a group of related tasks + // Submit a group of tasks for i := 0; i < 20; i++ { n := i group.Submit(func() { @@ -137,6 +138,59 @@ func main() { } ``` +### Submitting a group of tasks associated to a context (**since v1.8.0**) + +This feature provides synchronization, error propagation, and Context cancelation for subtasks of a common task. Similar to `errgroup.Group` from [`golang.org/x/sync/errgroup`](https://pkg.go.dev/golang.org/x/sync/errgroup) package with concurrency bounded by the worker pool. + +``` go +package main + +import ( + "context" + "fmt" + "net/http" + + "github.com/alitto/pond" +) + +func main() { + + // Create a worker pool + pool := pond.New(10, 1000) + defer pool.StopAndWait() + + // Create a task group associated to a context + group, ctx := pool.GroupContext(context.Background()) + + var urls = []string{ + "https://www.golang.org/", + "https://www.google.com/", + "https://www.github.com/", + } + + // Submit tasks to fetch each URL + for _, url := range urls { + url := url + group.Submit(func() error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + resp, err := http.DefaultClient.Do(req) + if err == nil { + resp.Body.Close() + } + return err + }) + } + + // Wait for all HTTP requests to complete. + err := group.Wait() + if err != nil { + fmt.Printf("Failed to fetch URLs: %v", err) + } else { + fmt.Println("Successfully fetched all URLs") + } +} +``` + ### Pool Configuration Options - **MinWorkers**: Specifies the minimum number of worker goroutines that must be running at any given time. These goroutines are started when the pool is created. The default value is 0. Example: @@ -213,7 +267,8 @@ In our [Prometheus example](./examples/prometheus/prometheus.go) we showcase how - [Creating a worker pool with fixed size](./examples/fixed_size/fixed_size.go) - [Creating a worker pool with a Context](./examples/pool_context/pool_context.go) - [Exporting worker pool metrics to Prometheus](./examples/prometheus/prometheus.go) -- [Submitting groups of related tasks](./examples/task_group/task_group.go) +- [Submitting a group of tasks](./examples/task_group/task_group.go) +- [Submitting a group of tasks associated to a context](./examples/group_context/group_context.go) ## API Reference diff --git a/examples/dynamic_size/go.mod b/examples/dynamic_size/go.mod index 9563074..ac435a2 100644 --- a/examples/dynamic_size/go.mod +++ b/examples/dynamic_size/go.mod @@ -1,9 +1,9 @@ module github.com/alitto/pond/examples/dynamic_size -go 1.17 +go 1.18 require ( - github.com/alitto/pond v1.7.0 + github.com/alitto/pond v1.7.1 ) replace github.com/alitto/pond => ../../ diff --git a/examples/fixed_size/go.mod b/examples/fixed_size/go.mod index 3461f0e..fd8d670 100644 --- a/examples/fixed_size/go.mod +++ b/examples/fixed_size/go.mod @@ -1,9 +1,9 @@ module github.com/alitto/pond/examples/fixed_size -go 1.17 +go 1.18 require ( - github.com/alitto/pond v1.7.0 + github.com/alitto/pond v1.7.1 ) replace github.com/alitto/pond => ../../ diff --git a/examples/group_context/go.mod b/examples/group_context/go.mod new file mode 100644 index 0000000..b5ea1e1 --- /dev/null +++ b/examples/group_context/go.mod @@ -0,0 +1,9 @@ +module github.com/alitto/pond/examples/group_context + +go 1.18 + +require ( + github.com/alitto/pond v1.7.1 +) + +replace github.com/alitto/pond => ../../ diff --git a/examples/group_context/group_context.go b/examples/group_context/group_context.go new file mode 100644 index 0000000..99e97db --- /dev/null +++ b/examples/group_context/group_context.go @@ -0,0 +1,46 @@ +package main + +import ( + "context" + "fmt" + "net/http" + + "github.com/alitto/pond" +) + +func main() { + + // Create a worker pool + pool := pond.New(10, 1000) + defer pool.StopAndWait() + + // Create a task group associated to a context + group, ctx := pool.GroupContext(context.Background()) + + var urls = []string{ + "https://www.golang.org/", + "https://www.google.com/", + "https://www.github.com/", + } + + // Submit tasks to fetch each URL + for _, url := range urls { + url := url + group.Submit(func() error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + resp, err := http.DefaultClient.Do(req) + if err == nil { + resp.Body.Close() + } + return err + }) + } + + // Wait for all HTTP requests to complete. + err := group.Wait() + if err != nil { + fmt.Printf("Failed to fetch URLs: %v", err) + } else { + fmt.Println("Successfully fetched all URLs") + } +} diff --git a/examples/pool_context/go.mod b/examples/pool_context/go.mod index 9fe755d..4e6ef6c 100644 --- a/examples/pool_context/go.mod +++ b/examples/pool_context/go.mod @@ -1,7 +1,7 @@ module github.com/alitto/pond/examples/pool_context -go 1.17 +go 1.18 -require github.com/alitto/pond v1.7.0 +require github.com/alitto/pond v1.7.1 replace github.com/alitto/pond => ../../ diff --git a/examples/prometheus/go.mod b/examples/prometheus/go.mod index 40d0388..34f426d 100644 --- a/examples/prometheus/go.mod +++ b/examples/prometheus/go.mod @@ -1,9 +1,9 @@ module github.com/alitto/pond/examples/fixed_size -go 1.17 +go 1.18 require ( - github.com/alitto/pond v1.7.0 + github.com/alitto/pond v1.7.1 github.com/prometheus/client_golang v1.9.0 ) diff --git a/examples/task_group/go.mod b/examples/task_group/go.mod index b4fd505..b6d1ab4 100644 --- a/examples/task_group/go.mod +++ b/examples/task_group/go.mod @@ -1,9 +1,9 @@ module github.com/alitto/pond/examples/task_group -go 1.17 +go 1.18 require ( - github.com/alitto/pond v1.7.0 + github.com/alitto/pond v1.7.1 ) replace github.com/alitto/pond => ../../ diff --git a/go.mod b/go.mod index c0a402f..0434d67 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module github.com/alitto/pond -go 1.17 +go 1.18 diff --git a/group.go b/group.go new file mode 100644 index 0000000..6df9d34 --- /dev/null +++ b/group.go @@ -0,0 +1,90 @@ +package pond + +import ( + "context" + "sync" +) + +// TaskGroup represents a group of related tasks +type TaskGroup struct { + pool *WorkerPool + waitGroup sync.WaitGroup +} + +// Submit adds a task to this group and sends it to the worker pool to be executed +func (g *TaskGroup) Submit(task func()) { + g.waitGroup.Add(1) + + g.pool.Submit(func() { + defer g.waitGroup.Done() + + task() + }) +} + +// Wait waits until all the tasks in this group have completed +func (g *TaskGroup) Wait() { + + // Wait for all tasks to complete + g.waitGroup.Wait() +} + +// TaskGroup represents a group of related tasks associated to a context +type TaskGroupWithContext struct { + TaskGroup + ctx context.Context + cancel context.CancelFunc + errOnce sync.Once + err error +} + +// Submit adds a task to this group and sends it to the worker pool to be executed +func (g *TaskGroupWithContext) Submit(task func() error) { + g.waitGroup.Add(1) + + g.pool.Submit(func() { + defer g.waitGroup.Done() + + // If context has already been cancelled, skip task execution + if g.ctx != nil { + select { + case <-g.ctx.Done(): + return + default: + } + } + + // don't actually ignore errors + err := task() + if err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }) +} + +// Wait blocks until either all the tasks submitted to this group have completed, +// one of them returned a non-nil error or the context associated to this group +// was canceled. +func (g *TaskGroupWithContext) Wait() error { + + // Wait for all tasks to complete + tasksCompleted := make(chan struct{}) + go func() { + g.waitGroup.Wait() + tasksCompleted <- struct{}{} + }() + + select { + case <-tasksCompleted: + // If context was provided, cancel it to signal all running tasks to stop + g.cancel() + case <-g.ctx.Done(): + } + + return g.err +} diff --git a/group_blackbox_test.go b/group_blackbox_test.go new file mode 100644 index 0000000..306f832 --- /dev/null +++ b/group_blackbox_test.go @@ -0,0 +1,107 @@ +package pond_test + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/alitto/pond" +) + +func TestGroupSubmit(t *testing.T) { + + pool := pond.New(5, 1000) + assertEqual(t, 0, pool.RunningWorkers()) + + // Submit groups of tasks + var doneCount, taskCount int32 + var groups []*pond.TaskGroup + for i := 0; i < 5; i++ { + group := pool.Group() + for j := 0; j < i+5; j++ { + group.Submit(func() { + time.Sleep(1 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }) + taskCount++ + } + groups = append(groups, group) + } + + // Wait for all groups to complete + for _, group := range groups { + group.Wait() + } + + assertEqual(t, int32(taskCount), atomic.LoadInt32(&doneCount)) +} + +func TestGroupContext(t *testing.T) { + + pool := pond.New(3, 100) + assertEqual(t, 0, pool.RunningWorkers()) + + // Submit a group of tasks + var doneCount, startedCount int32 + group, ctx := pool.GroupContext(context.Background()) + for i := 0; i < 10; i++ { + group.Submit(func() error { + atomic.AddInt32(&startedCount, 1) + + select { + case <-time.After(5 * time.Millisecond): + atomic.AddInt32(&doneCount, 1) + case <-ctx.Done(): + } + + return nil + }) + } + + err := group.Wait() + assertEqual(t, nil, err) + assertEqual(t, int32(10), atomic.LoadInt32(&startedCount)) + assertEqual(t, int32(10), atomic.LoadInt32(&doneCount)) +} + +func TestGroupContextWithError(t *testing.T) { + + pool := pond.New(1, 100) + assertEqual(t, 0, pool.RunningWorkers()) + + expectedErr := errors.New("Something went wrong") + + // Submit a group of tasks + var doneCount, startedCount int32 + group, ctx := pool.GroupContext(context.Background()) + for i := 0; i < 10; i++ { + n := i + group.Submit(func() error { + atomic.AddInt32(&startedCount, 1) + + // Task number 5 fails + if n == 4 { + time.Sleep(10 * time.Millisecond) + return expectedErr + } + + select { + case <-time.After(5 * time.Millisecond): + atomic.AddInt32(&doneCount, 1) + case <-ctx.Done(): + } + + return nil + }) + } + + err := group.Wait() + assertEqual(t, expectedErr, err) + + pool.StopAndWait() + + assertEqual(t, int32(5), atomic.LoadInt32(&startedCount)) + assertEqual(t, int32(4), atomic.LoadInt32(&doneCount)) +} diff --git a/pond.go b/pond.go index 865f3e1..d025b12 100644 --- a/pond.go +++ b/pond.go @@ -487,6 +487,21 @@ func (p *WorkerPool) Group() *TaskGroup { } } +// GroupContext creates a new task group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function submitted to the group +// returns a non-nil error or the first time Wait returns, whichever occurs first. +func (p *WorkerPool) GroupContext(ctx context.Context) (*TaskGroupWithContext, context.Context) { + ctx, cancel := context.WithCancel(ctx) + return &TaskGroupWithContext{ + TaskGroup: TaskGroup{ + pool: p, + }, + ctx: ctx, + cancel: cancel, + }, ctx +} + // worker launches a worker goroutine func worker(context context.Context, firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) { @@ -528,25 +543,3 @@ func worker(context context.Context, firstTask func(), tasks <-chan func(), idle } } } - -// TaskGroup represents a group of related tasks -type TaskGroup struct { - pool *WorkerPool - waitGroup sync.WaitGroup -} - -// Submit adds a task to this group and sends it to the worker pool to be executed -func (g *TaskGroup) Submit(task func()) { - g.waitGroup.Add(1) - g.pool.Submit(func() { - defer g.waitGroup.Done() - task() - }) -} - -// Wait waits until all the tasks in this group have completed -func (g *TaskGroup) Wait() { - - // Wait for all tasks to complete - g.waitGroup.Wait() -} diff --git a/pond_blackbox_test.go b/pond_blackbox_test.go index f81280e..ca10b11 100644 --- a/pond_blackbox_test.go +++ b/pond_blackbox_test.go @@ -450,34 +450,6 @@ func TestPoolWithCustomMinWorkers(t *testing.T) { assertEqual(t, 0, pool.RunningWorkers()) } -func TestGroupSubmit(t *testing.T) { - - pool := pond.New(5, 1000) - assertEqual(t, 0, pool.RunningWorkers()) - - // Submit groups of tasks - var doneCount, taskCount int32 - var groups []*pond.TaskGroup - for i := 0; i < 5; i++ { - group := pool.Group() - for j := 0; j < i+5; j++ { - group.Submit(func() { - time.Sleep(1 * time.Millisecond) - atomic.AddInt32(&doneCount, 1) - }) - taskCount++ - } - groups = append(groups, group) - } - - // Wait for all groups to complete - for _, group := range groups { - group.Wait() - } - - assertEqual(t, int32(taskCount), atomic.LoadInt32(&doneCount)) -} - func TestPoolWithCustomStrategy(t *testing.T) { pool := pond.New(3, 3, pond.Strategy(pond.RatedResizer(2)))