Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement task group associated to a Context & more #30

Merged
merged 1 commit into from May 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Expand Up @@ -10,7 +10,7 @@ jobs:
name: Test
strategy:
matrix:
go-version: [1.15.x, 1.16.x, 1.17.x]
go-version: [1.15.x, 1.16.x, 1.17.x, 1.18.x]
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
Expand Down
63 changes: 59 additions & 4 deletions README.md
Expand Up @@ -25,7 +25,8 @@ Some common scenarios include:
- Submitting tasks to a pool in a fire-and-forget fashion
- Submitting tasks to a pool and waiting for them to complete
- Submitting tasks to a pool with a deadline
- Submitting a group of related tasks and waiting for them to complete
- Submitting a group of tasks and waiting for them to complete
- Submitting a group of tasks associated to a Context
- Getting the number of running workers (goroutines)
- Stopping a worker pool
- Task panics are handled gracefully (configurable panic handler)
Expand Down Expand Up @@ -104,7 +105,7 @@ func main() {
}
```

### Submitting groups of related tasks
### Submitting a group of tasks

``` go
package main
Expand All @@ -124,7 +125,7 @@ func main() {
// Create a task group
group := pool.Group()

// Submit a group of related tasks
// Submit a group of tasks
for i := 0; i < 20; i++ {
n := i
group.Submit(func() {
Expand All @@ -137,6 +138,59 @@ func main() {
}
```

### Submitting a group of tasks associated to a context (**since v1.8.0**)

This feature provides synchronization, error propagation, and Context cancelation for subtasks of a common task. Similar to `errgroup.Group` from [`golang.org/x/sync/errgroup`](https://pkg.go.dev/golang.org/x/sync/errgroup) package with concurrency bounded by the worker pool.

``` go
package main

import (
"context"
"fmt"
"net/http"

"github.com/alitto/pond"
)

func main() {

// Create a worker pool
pool := pond.New(10, 1000)
defer pool.StopAndWait()

// Create a task group associated to a context
group, ctx := pool.GroupContext(context.Background())

var urls = []string{
"https://www.golang.org/",
"https://www.google.com/",
"https://www.github.com/",
}

// Submit tasks to fetch each URL
for _, url := range urls {
url := url
group.Submit(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
resp, err := http.DefaultClient.Do(req)
if err == nil {
resp.Body.Close()
}
return err
})
}

// Wait for all HTTP requests to complete.
err := group.Wait()
if err != nil {
fmt.Printf("Failed to fetch URLs: %v", err)
} else {
fmt.Println("Successfully fetched all URLs")
}
}
```

### Pool Configuration Options

- **MinWorkers**: Specifies the minimum number of worker goroutines that must be running at any given time. These goroutines are started when the pool is created. The default value is 0. Example:
Expand Down Expand Up @@ -213,7 +267,8 @@ In our [Prometheus example](./examples/prometheus/prometheus.go) we showcase how
- [Creating a worker pool with fixed size](./examples/fixed_size/fixed_size.go)
- [Creating a worker pool with a Context](./examples/pool_context/pool_context.go)
- [Exporting worker pool metrics to Prometheus](./examples/prometheus/prometheus.go)
- [Submitting groups of related tasks](./examples/task_group/task_group.go)
- [Submitting a group of tasks](./examples/task_group/task_group.go)
- [Submitting a group of tasks associated to a context](./examples/group_context/group_context.go)

## API Reference

Expand Down
4 changes: 2 additions & 2 deletions examples/dynamic_size/go.mod
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/dynamic_size

go 1.17
go 1.18

require (
github.com/alitto/pond v1.7.0
github.com/alitto/pond v1.7.1
)

replace github.com/alitto/pond => ../../
4 changes: 2 additions & 2 deletions examples/fixed_size/go.mod
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/fixed_size

go 1.17
go 1.18

require (
github.com/alitto/pond v1.7.0
github.com/alitto/pond v1.7.1
)

replace github.com/alitto/pond => ../../
9 changes: 9 additions & 0 deletions examples/group_context/go.mod
@@ -0,0 +1,9 @@
module github.com/alitto/pond/examples/group_context

go 1.18

require (
github.com/alitto/pond v1.7.1
)

replace github.com/alitto/pond => ../../
46 changes: 46 additions & 0 deletions examples/group_context/group_context.go
@@ -0,0 +1,46 @@
package main

import (
"context"
"fmt"
"net/http"

"github.com/alitto/pond"
)

func main() {

// Create a worker pool
pool := pond.New(10, 1000)
defer pool.StopAndWait()

// Create a task group associated to a context
group, ctx := pool.GroupContext(context.Background())

var urls = []string{
"https://www.golang.org/",
"https://www.google.com/",
"https://www.github.com/",
}

// Submit tasks to fetch each URL
for _, url := range urls {
url := url
group.Submit(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
resp, err := http.DefaultClient.Do(req)
if err == nil {
resp.Body.Close()
}
return err
})
}

// Wait for all HTTP requests to complete.
err := group.Wait()
if err != nil {
fmt.Printf("Failed to fetch URLs: %v", err)
} else {
fmt.Println("Successfully fetched all URLs")
}
}
4 changes: 2 additions & 2 deletions examples/pool_context/go.mod
@@ -1,7 +1,7 @@
module github.com/alitto/pond/examples/pool_context

go 1.17
go 1.18

require github.com/alitto/pond v1.7.0
require github.com/alitto/pond v1.7.1

replace github.com/alitto/pond => ../../
4 changes: 2 additions & 2 deletions examples/prometheus/go.mod
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/fixed_size

go 1.17
go 1.18

require (
github.com/alitto/pond v1.7.0
github.com/alitto/pond v1.7.1
github.com/prometheus/client_golang v1.9.0
)

Expand Down
4 changes: 2 additions & 2 deletions examples/task_group/go.mod
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/task_group

go 1.17
go 1.18

require (
github.com/alitto/pond v1.7.0
github.com/alitto/pond v1.7.1
)

replace github.com/alitto/pond => ../../
2 changes: 1 addition & 1 deletion go.mod
@@ -1,3 +1,3 @@
module github.com/alitto/pond

go 1.17
go 1.18
90 changes: 90 additions & 0 deletions group.go
@@ -0,0 +1,90 @@
package pond

import (
"context"
"sync"
)

// TaskGroup represents a group of related tasks
type TaskGroup struct {
pool *WorkerPool
waitGroup sync.WaitGroup
}

// Submit adds a task to this group and sends it to the worker pool to be executed
func (g *TaskGroup) Submit(task func()) {
g.waitGroup.Add(1)

g.pool.Submit(func() {
defer g.waitGroup.Done()

task()
})
}

// Wait waits until all the tasks in this group have completed
func (g *TaskGroup) Wait() {

// Wait for all tasks to complete
g.waitGroup.Wait()
}

// TaskGroup represents a group of related tasks associated to a context
type TaskGroupWithContext struct {
TaskGroup
ctx context.Context
cancel context.CancelFunc
errOnce sync.Once
err error
}

// Submit adds a task to this group and sends it to the worker pool to be executed
func (g *TaskGroupWithContext) Submit(task func() error) {
g.waitGroup.Add(1)

g.pool.Submit(func() {
defer g.waitGroup.Done()

// If context has already been cancelled, skip task execution
if g.ctx != nil {
select {
case <-g.ctx.Done():
return
default:
}
}

// don't actually ignore errors
err := task()
if err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
})
}

// Wait blocks until either all the tasks submitted to this group have completed,
// one of them returned a non-nil error or the context associated to this group
// was canceled.
func (g *TaskGroupWithContext) Wait() error {

// Wait for all tasks to complete
tasksCompleted := make(chan struct{})
go func() {
g.waitGroup.Wait()
tasksCompleted <- struct{}{}
}()

select {
case <-tasksCompleted:
// If context was provided, cancel it to signal all running tasks to stop
g.cancel()
case <-g.ctx.Done():
}

return g.err
}