-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool_executor_test.go
134 lines (110 loc) · 2.61 KB
/
pool_executor_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package executors
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestPoolExecutor_Execute(t *testing.T) {
type Person struct {
Name string
}
executed := false
errCaught := false
ch1 := make(chan struct{})
ch2 := make(chan struct{})
service := NewPoolExecutorService[Person](
WithMaxConcurrent(10),
WithErrorHandler(ErrorHandlerFunc(func(runnable Runnable, e error) {
errCaught = true
require.Contains(t, "test", e.Error())
close(ch1)
})))
t.Run("run", func(t *testing.T) {
err := service.Execute(RunnableFunc(func(ctx context.Context) {
executed = true
close(ch2)
}))
require.NoError(t, err)
})
t.Run("panic handler", func(t *testing.T) {
err := service.Execute(RunnableFunc(func(ctx context.Context) {
panic(errors.New("test"))
}))
require.NoError(t, err)
})
<-ch1
<-ch2
require.True(t, executed)
require.True(t, errCaught)
}
type Person struct {
Name string
}
func TestPoolExecutor_Submit(t *testing.T) {
service := NewPoolExecutorService[Person](WithMaxConcurrent(10))
t.Run("one task success", func(t *testing.T) {
callable := CallableFunc[Person](func(ctx context.Context) (Person, error) {
return Person{
Name: "future",
}, nil
})
f, err := service.Submit(callable)
require.NoError(t, err)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
got, err := f.Get(context.Background())
require.NoError(t, err)
require.Equal(t, "future", got.Name)
}()
}
wg.Wait()
})
t.Run("one task error", func(t *testing.T) {
targetErr := errors.New("error")
callable := CallableFunc[Person](func(ctx context.Context) (Person, error) {
return Person{}, targetErr
})
f, err := service.Submit(callable)
require.NoError(t, err)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := f.Get(context.Background())
require.True(t, errors.Is(err, targetErr))
}()
}
wg.Wait()
})
t.Run("one task canceled", func(t *testing.T) {
callable := CallableFunc[Person](func(ctx context.Context) (Person, error) {
time.Sleep(1 * time.Second)
return Person{
Name: "future",
}, nil
})
f, err := service.Submit(callable)
require.NoError(t, err)
_ = time.AfterFunc(50*time.Millisecond, func() {
cancel := f.Cancel()
require.True(t, cancel)
})
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := f.Get(context.Background())
require.True(t, errors.Is(err, ErrFutureCanceled))
}()
}
wg.Wait()
})
}