/
spara.go
executable file
·164 lines (148 loc) · 5.08 KB
/
spara.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Package spara provides a couple of functions for concurrent mapping over
// elements of a slice with early cancellation.
package spara
import (
"context"
"errors"
"sync"
"sync/atomic"
)
var (
ErrInvalidWorkers = errors.New("spara: invalid number of workers")
ErrInvalidIterations = errors.New("spara: invalid number of iterations")
ErrNilMappingFunction = errors.New("spara: mapping function must not be nil")
ErrNilContext = errors.New("spara: context must not be nil")
)
// Run takes a mapping function and then runs it concurrently across up to
// workers separate goroutines, calling it with every number in the range [0,
// iterations). If the mapping function returns an error, iteration will stop
// prematurely and Run will return that first error once all in-progress calls
// to the mapping function complete.
func Run(workers int, iterations int, fn func(index int) error) error {
if fn == nil {
return ErrNilMappingFunction
}
wrapped := func(_ context.Context, index int) error { return fn(index) }
return RunWithContext(context.Background(), workers, iterations, wrapped)
}
type MappingFunc func(ctx context.Context, index int) error
// RunWithContext is like Run, but it accepts a parent context. Completion of
// this context is will stop iteration early if possible. If completion causes
// iteration to stop, the error returned from RunWithContext will be the value
// of the parent context's Err().
//
// RunWithContext passes a context to the mapping function as well. This will
// be a child of the provided parent context, and will complete either on the
// first returned error, the parent context completing, or all of the worker
// goroutines returning.
//
// This method can give very large performance improvements when elements of
// the mapping function support context for early cancellation (eg
// http.Request's WithContext). Imagine a function that downloads multiple
// files in parallel. Using plain Run, the first error would cause no new
// files to begin downloading, but it still wouldn't return until all of the
// in progress downloads completed. If the in progress downloads are large,
// that could mean you're waiting a very long time for a bunch of data you
// don't actually care about. With early cancellation, these requests would be
// canceled eagerly, and the function could return faster.
func RunWithContext(parent context.Context, workers int, iterations int, fn MappingFunc) error {
if workers <= 0 {
return ErrInvalidWorkers
}
if iterations < 0 {
return ErrInvalidIterations
}
if fn == nil {
return ErrNilMappingFunction
}
if parent == nil {
return ErrNilContext
}
if iterations == 0 {
return nil
}
// Only need to spawn as many workers as we have iterations.
if workers > iterations {
workers = iterations
}
// Eagerly check whether the parent context is already done.
select {
case <-parent.Done():
return parent.Err()
default:
break
}
// Create a function that atomically returns the next index to process. We
// can start this at workers-1, since the workers are passed their first
// index directly.
var index int32 = int32(workers - 1)
nextIndex := func() int {
return int(atomic.AddInt32(&index, 1))
}
// Atomically stops iteration inside of the worker functions.
stopIteration := func() {
atomic.StoreInt32(&index, int32(iterations))
}
// Wrap the parent context with cancellation so that we can stop internal
// processing whenever a worker returns an error.
ctx, cancel := context.WithCancel(parent)
defer cancel()
var killOnce int32
var firsterr error
kill := func(err error) {
// Only execute this on the first call. Only worker functions call
// kill, so we can be certain that firsterr is safe to access once
// wg.Done unblocks.
if atomic.CompareAndSwapInt32(&killOnce, 0, 1) {
stopIteration()
cancel()
firsterr = err
}
}
// Start a goroutine that will stop iteration on parent context done. Must
// be specified _after_ we wrap the parent context WithCancel; if parent
// never completed, the spawned goroutine would leak.
//
// We don't need to spawn if the parent context never completes. The
// stdlib's context.Background's Done method returns nil, so apparently we
// can check that to decide what to do.
parentIsNeverDone := parent.Done() == nil
if !parentIsNeverDone {
go func() {
<-ctx.Done()
if atomic.CompareAndSwapInt32(&killOnce, 0, 2) {
stopIteration()
}
}()
}
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(start int) {
defer wg.Done()
for j := start; j < iterations; j = nextIndex() {
if err := fn(ctx, j); err != nil {
kill(err)
return
}
}
}(i)
}
wg.Wait()
// killOnce = 1
if firsterr != nil {
return firsterr
}
// firsterr is nil, but the parent context may have been the thing that
// stopped iteration, ie killOnce = 2. We know it definitely hasn't if the
// parent doesn't terminate in the first place.
if parentIsNeverDone {
return nil
}
// killOnce = 0, the parent context isn't done
if atomic.CompareAndSwapInt32(&killOnce, 0, 3) {
return nil
}
// killOnce = 2
return ctx.Err()
}