Skip to content

Commit

Permalink
Group context
Browse files Browse the repository at this point in the history
  • Loading branch information
alitto committed May 9, 2022
1 parent d4c09d4 commit 583da22
Show file tree
Hide file tree
Showing 14 changed files with 338 additions and 66 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Expand Up @@ -10,7 +10,7 @@ jobs:
name: Test
strategy:
matrix:
go-version: [1.15.x, 1.16.x, 1.17.x]
go-version: [1.15.x, 1.16.x, 1.17.x, 1.18.x]
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
Expand Down
63 changes: 59 additions & 4 deletions README.md
Expand Up @@ -25,7 +25,8 @@ Some common scenarios include:
- Submitting tasks to a pool in a fire-and-forget fashion
- Submitting tasks to a pool and waiting for them to complete
- Submitting tasks to a pool with a deadline
- Submitting a group of related tasks and waiting for them to complete
- Submitting a group of tasks and waiting for them to complete
- Submitting a group of tasks associated to a Context
- Getting the number of running workers (goroutines)
- Stopping a worker pool
- Task panics are handled gracefully (configurable panic handler)
Expand Down Expand Up @@ -104,7 +105,7 @@ func main() {
}
```

### Submitting groups of related tasks
### Submitting a group of tasks

``` go
package main
Expand All @@ -124,7 +125,7 @@ func main() {
// Create a task group
group := pool.Group()

// Submit a group of related tasks
// Submit a group of tasks
for i := 0; i < 20; i++ {
n := i
group.Submit(func() {
Expand All @@ -137,6 +138,59 @@ func main() {
}
```

### Submitting a group of tasks associated to a context (**v1.8+**)

This feature provides synchronization, error propagation, and Context cancelation for subtasks of a common task.

``` go
package main

import (
"context"
"fmt"
"net/http"

"github.com/alitto/pond"
)

func main() {

// Create a worker pool
pool := pond.New(10, 1000)
defer pool.StopAndWait()

// Create a task group associated to a context
group, ctx := pool.GroupContext(context.Background())

var urls = []string{
"https://www.golang.org/",
"https://www.google.com/",
"https://www.github.com/",
}

// Submit tasks to fetch each URL
for _, url := range urls {
url := url
group.Submit(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
resp, err := http.DefaultClient.Do(req)
if err == nil {
resp.Body.Close()
}
return err
})
}

// Wait for all HTTP requests to complete.
err := group.Wait()
if err != nil {
fmt.Printf("Failed to fetch URLs: %v", err)
} else {
fmt.Println("Successfully fetched all URLs")
}
}
```

### Pool Configuration Options

- **MinWorkers**: Specifies the minimum number of worker goroutines that must be running at any given time. These goroutines are started when the pool is created. The default value is 0. Example:
Expand Down Expand Up @@ -213,7 +267,8 @@ In our [Prometheus example](./examples/prometheus/prometheus.go) we showcase how
- [Creating a worker pool with fixed size](./examples/fixed_size/fixed_size.go)
- [Creating a worker pool with a Context](./examples/pool_context/pool_context.go)
- [Exporting worker pool metrics to Prometheus](./examples/prometheus/prometheus.go)
- [Submitting groups of related tasks](./examples/task_group/task_group.go)
- [Submitting a group of tasks](./examples/task_group/task_group.go)
- [Submitting a group of tasks associated to a context](./examples/group_context/group_context.go)

## API Reference

Expand Down
4 changes: 2 additions & 2 deletions examples/dynamic_size/go.mod
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/dynamic_size

go 1.17
go 1.18

require (
github.com/alitto/pond v1.7.0
github.com/alitto/pond v1.7.1
)

replace github.com/alitto/pond => ../../
4 changes: 2 additions & 2 deletions examples/fixed_size/go.mod
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/fixed_size

go 1.17
go 1.18

require (
github.com/alitto/pond v1.7.0
github.com/alitto/pond v1.7.1
)

replace github.com/alitto/pond => ../../
9 changes: 9 additions & 0 deletions examples/group_context/go.mod
@@ -0,0 +1,9 @@
module github.com/alitto/pond/examples/group_context

go 1.18

require (
github.com/alitto/pond v1.7.1
)

replace github.com/alitto/pond => ../../
46 changes: 46 additions & 0 deletions examples/group_context/group_context.go
@@ -0,0 +1,46 @@
package main

import (
"context"
"fmt"
"net/http"

"github.com/alitto/pond"
)

func main() {

// Create a worker pool
pool := pond.New(10, 1000)
defer pool.StopAndWait()

// Create a task group associated to a context
group, ctx := pool.GroupContext(context.Background())

var urls = []string{
"https://www.golang.org/",
"https://www.google.com/",
"https://www.github.com/",
}

// Submit tasks to fetch each URL
for _, url := range urls {
url := url
group.Submit(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
resp, err := http.DefaultClient.Do(req)
if err == nil {
resp.Body.Close()
}
return err
})
}

// Wait for all HTTP requests to complete.
err := group.Wait()
if err != nil {
fmt.Printf("Failed to fetch URLs: %v", err)
} else {
fmt.Println("Successfully fetched all URLs")
}
}
4 changes: 2 additions & 2 deletions examples/pool_context/go.mod
@@ -1,7 +1,7 @@
module github.com/alitto/pond/examples/pool_context

go 1.17
go 1.18

require github.com/alitto/pond v1.7.0
require github.com/alitto/pond v1.7.1

replace github.com/alitto/pond => ../../
4 changes: 2 additions & 2 deletions examples/prometheus/go.mod
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/fixed_size

go 1.17
go 1.18

require (
github.com/alitto/pond v1.7.0
github.com/alitto/pond v1.7.1
github.com/prometheus/client_golang v1.9.0
)

Expand Down
4 changes: 2 additions & 2 deletions examples/task_group/go.mod
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/task_group

go 1.17
go 1.18

require (
github.com/alitto/pond v1.7.0
github.com/alitto/pond v1.7.1
)

replace github.com/alitto/pond => ../../
2 changes: 1 addition & 1 deletion go.mod
@@ -1,3 +1,3 @@
module github.com/alitto/pond

go 1.17
go 1.18
90 changes: 90 additions & 0 deletions group.go
@@ -0,0 +1,90 @@
package pond

import (
"context"
"sync"
)

// TaskGroup represents a group of related tasks
type TaskGroup struct {
pool *WorkerPool
waitGroup sync.WaitGroup
}

// Submit adds a task to this group and sends it to the worker pool to be executed
func (g *TaskGroup) Submit(task func()) {
g.waitGroup.Add(1)

g.pool.Submit(func() {
defer g.waitGroup.Done()

task()
})
}

// Wait waits until all the tasks in this group have completed
func (g *TaskGroup) Wait() {

// Wait for all tasks to complete
g.waitGroup.Wait()
}

// TaskGroup represents a group of related tasks associated to a context
type TaskGroupWithContext struct {
TaskGroup
ctx context.Context
cancel context.CancelFunc
errOnce sync.Once
err error
}

// Submit adds a task to this group and sends it to the worker pool to be executed
func (g *TaskGroupWithContext) Submit(task func() error) {
g.waitGroup.Add(1)

g.pool.Submit(func() {
defer g.waitGroup.Done()

// If context has already been cancelled, skip task execution
if g.ctx != nil {
select {
case <-g.ctx.Done():
return
default:
}
}

// don't actually ignore errors
err := task()
if err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
})
}

// Wait blocks until either all the tasks submitted to this group have completed,
// one of them returned a non-nil error or the context associated to this group
// was canceled.
func (g *TaskGroupWithContext) Wait() error {

// Wait for all tasks to complete
tasksCompleted := make(chan struct{})
go func() {
g.waitGroup.Wait()
tasksCompleted <- struct{}{}
}()

select {
case <-tasksCompleted:
// If context was provided, cancel it to signal all running tasks to stop
g.cancel()
case <-g.ctx.Done():
}

return g.err
}

0 comments on commit 583da22

Please sign in to comment.