forked from Azure/go-amqp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
manualCreditor_test.go
113 lines (83 loc) · 2.66 KB
/
manualCreditor_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package amqp
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestManualCreditorIssueCredits(t *testing.T) {
mc := manualCreditor{}
require.NoError(t, mc.IssueCredit(3))
drain, credits := mc.FlowBits(1)
require.False(t, drain)
require.EqualValues(t, 3+1, credits, "flow frame includes the pending credits and our current credits")
// flow clears the previous data once it's been called.
drain, credits = mc.FlowBits(4)
require.False(t, drain)
require.EqualValues(t, 0, credits, "drain flow frame always sets link-credit to 0")
}
func TestManualCreditorDrain(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()
mc := manualCreditor{}
require.NoError(t, mc.IssueCredit(3))
// only one drain allowed at a time.
drainRoutines := sync.WaitGroup{}
drainRoutines.Add(2)
var err1, err2 error
go func() {
defer drainRoutines.Done()
err1 = mc.Drain(ctx)
}()
go func() {
defer drainRoutines.Done()
err2 = mc.Drain(ctx)
}()
// one of the drain calls will have succeeded, the other one should still be blocking.
time.Sleep(time.Second * 2)
// the next time someone requests a flow frame it'll drain (this doesn't affect the blocked Drain() calls)
drain, credits := mc.FlowBits(101)
require.True(t, drain)
require.EqualValues(t, 0, credits, "Drain always drains with 0 credit")
// unblock the last of the drainers
mc.EndDrain()
require.Nil(t, mc.drained, "drain completes and removes the drained channel")
// wait for all the drain routines to end
drainRoutines.Wait()
// one of them should have failed (if both succeeded we've somehow let them both run)
require.False(t, err1 == nil && err2 == nil)
if err1 == nil {
require.Error(t, err2, errAlreadyDraining.Error())
} else {
require.Error(t, err1, errAlreadyDraining.Error())
}
}
func TestManualCreditorIssueCreditsWhileDrainingFails(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()
mc := manualCreditor{}
require.NoError(t, mc.IssueCredit(3))
// only one drain allowed at a time.
drainRoutines := sync.WaitGroup{}
drainRoutines.Add(2)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := mc.Drain(ctx)
require.NoError(t, err)
}()
time.Sleep(time.Second * 2)
// drain is still active, so...
require.Error(t, mc.IssueCredit(1), errLinkDraining.Error())
mc.EndDrain()
wg.Wait()
}
func TestManualCreditorDrainRespectsContext(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()
mc := manualCreditor{}
cancel()
require.Error(t, mc.Drain(ctx), context.Canceled.Error())
}