forked from jrallison/go-workers
-
Notifications
You must be signed in to change notification settings - Fork 2
/
workers_test.go
130 lines (93 loc) · 2.19 KB
/
workers_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package workers
import (
"reflect"
"time"
"github.com/customerio/gospec"
. "github.com/customerio/gospec"
)
var called chan bool
func myJob(message *Msg) error {
called <- true
return nil
}
var quitted chan bool
func blckJob(message *Msg) error {
var err error
for err == nil {
time.Sleep(100 * time.Millisecond)
err = message.Context.Err()
}
quitted <- true
return err
}
func WorkersSpec(c gospec.Context) {
c.Specify("Workers", func() {
c.Specify("allows running in tests", func() {
called = make(chan bool)
Process("myqueue", myJob, 10)
Start()
Enqueue("myqueue", "Add", []int{1, 2})
<-called
Quit()
})
// TODO make this test more deterministic, randomly locks up in travis.
//c.Specify("allows starting and stopping multiple times", func() {
// called = make(chan bool)
// Process("myqueue", myJob, 10)
// Start()
// Quit()
// Start()
// Enqueue("myqueue", "Add", []int{1, 2})
// <-called
// Quit()
//})
c.Specify("runs beforeStart hooks", func() {
hooks := []string{}
BeforeStart(func() {
hooks = append(hooks, "1")
})
BeforeStart(func() {
hooks = append(hooks, "2")
})
BeforeStart(func() {
hooks = append(hooks, "3")
})
Start()
c.Expect(reflect.DeepEqual(hooks, []string{"1", "2", "3"}), IsTrue)
Quit()
// Clear out global hooks variable
beforeStart = nil
})
c.Specify("runs beforeStart hooks", func() {
hooks := []string{}
DuringDrain(func() {
hooks = append(hooks, "1")
})
DuringDrain(func() {
hooks = append(hooks, "2")
})
DuringDrain(func() {
hooks = append(hooks, "3")
})
Start()
c.Expect(reflect.DeepEqual(hooks, []string{}), IsTrue)
Quit()
c.Expect(reflect.DeepEqual(hooks, []string{"1", "2", "3"}), IsTrue)
// Clear out global hooks variable
duringDrain = nil
})
c.Specify("runs beforeStart hooks", func() {
quitted = make(chan bool)
Process("myqueue", blckJob, 10)
Start()
jid, err := Enqueue("myqueue", "Add", []int{1, 2})
c.Expect(err, IsNil)
time.Sleep(100 * time.Millisecond)
c.Expect(len(quitted), Equals, 0)
err = CancelJob(jid)
c.Expect(err, IsNil)
<-quitted
Quit()
})
})
}