-
Notifications
You must be signed in to change notification settings - Fork 0
/
conman.go
75 lines (63 loc) · 1.66 KB
/
conman.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// Author: Ilyess Bachiri
// Copyright (c) 2021-present Ilyess Bachiri
// Package conman, a concurrency management package
package conman
import "sync"
// ConMan a structure to manage multiple tasks running
// concurrently while ensuring the total number of running
// tasks doesn't exceed a certain concurrency limit
type ConMan struct {
wg sync.WaitGroup
errors []error
outputs []interface{}
buffer chan interface{}
}
// New creates a new ConMan instance
func New(concurrencyLimit int64) ConMan {
return ConMan{buffer: make(chan interface{}, concurrencyLimit)}
}
// Task an interface that defines task execution
type Task interface {
Execute() (interface{}, error)
}
// Run runs a task function
// If the concurrency limit is reached, it waits until a running
// task is done.
//
// A task function must not take in any parameters, and must return
// a interface{}-error pair
// e.g.: func () (interface{}, error) {}
func (c *ConMan) Run(t Task) {
c.reserveOne()
go func() {
defer c.releaseOne()
op, err := t.Execute()
if err != nil {
c.errors = append(c.errors, err)
} else {
c.outputs = append(c.outputs, op)
}
}()
}
// Wait suspends execution until all running tasks are done
func (c *ConMan) Wait() {
c.wg.Wait()
}
// Outputs returns a slice of returned values from all the tasks
// that did not return an error
func (c *ConMan) Outputs() []interface{} {
return c.outputs
}
// Errors returns a slice of errors that were returned
// by all the tasks run by the Run function
func (c *ConMan) Errors() []error {
return c.errors
}
func (c *ConMan) reserveOne() {
c.buffer <- nil
c.wg.Add(1)
}
func (c *ConMan) releaseOne() {
c.wg.Done()
<-c.buffer
}