Skip to content

Commit

Permalink
Merge pull request #1315 from libp2p/fix/flaky-rcmgr-test
Browse files Browse the repository at this point in the history
fix flaky resource manager tests
  • Loading branch information
vyzo committed Feb 6, 2022
2 parents c681541 + edc86d9 commit f2fa0b6
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
19 changes: 19 additions & 0 deletions itest/echo.go
Expand Up @@ -30,6 +30,7 @@ type Echo struct {
status EchoStatus

beforeReserve, beforeRead, beforeWrite, beforeDone func() error
done func()
}

type EchoStatus struct {
Expand Down Expand Up @@ -81,6 +82,13 @@ func (e *Echo) BeforeDone(f func() error) {
e.beforeDone = f
}

func (e *Echo) Done(f func()) {
e.mx.Lock()
defer e.mx.Unlock()

e.done = f
}

func (e *Echo) getBeforeReserve() func() error {
e.mx.Lock()
defer e.mx.Unlock()
Expand Down Expand Up @@ -109,9 +117,20 @@ func (e *Echo) getBeforeDone() func() error {
return e.beforeDone
}

func (e *Echo) getDone() func() {
e.mx.Lock()
defer e.mx.Unlock()

return e.done
}

func (e *Echo) handleStream(s network.Stream) {
defer s.Close()

if done := e.getDone(); done != nil {
defer done()
}

e.mx.Lock()
e.status.StreamsIn++
e.mx.Unlock()
Expand Down
34 changes: 23 additions & 11 deletions itest/rcmgr_test.go
Expand Up @@ -109,9 +109,6 @@ func TestResourceManagerServiceInbound(t *testing.T) {
defer closeEchos(echos)
defer closeRcmgrs(echos)

ready := make(chan struct{})
echos[0].BeforeDone(waitForChannel(ready, time.Minute))

for i := 1; i < 5; i++ {
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
if err != nil {
Expand All @@ -120,9 +117,16 @@ func TestResourceManagerServiceInbound(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}

ready := make(chan struct{})
echos[0].BeforeDone(waitForChannel(ready, time.Minute))

var eg sync.WaitGroup
echos[0].Done(eg.Done)

var once sync.Once
var wg sync.WaitGroup
for i := 1; i < 5; i++ {
eg.Add(1)
wg.Add(1)
go func(i int) {
defer wg.Done()
Expand All @@ -137,6 +141,7 @@ func TestResourceManagerServiceInbound(t *testing.T) {
}(i)
}
wg.Wait()
eg.Wait()

checkEchoStatus(t, echos[0], EchoStatus{
StreamsIn: 4,
Expand All @@ -157,11 +162,6 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
defer closeEchos(echos)
defer closeRcmgrs(echos)

count := new(int32)
ready := make(chan struct{})
*count = 4
echos[0].BeforeDone(waitForBarrier(count, ready, time.Minute))

for i := 1; i < 5; i++ {
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
if err != nil {
Expand All @@ -170,8 +170,14 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}

echos[0].BeforeDone(waitForBarrier(4, time.Minute))

var eg sync.WaitGroup
echos[0].Done(eg.Done)

var wg sync.WaitGroup
for i := 1; i < 5; i++ {
eg.Add(1)
wg.Add(1)
go func(i int) {
defer wg.Done()
Expand All @@ -183,6 +189,7 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
}(i)
}
wg.Wait()
eg.Wait()

checkEchoStatus(t, echos[0], EchoStatus{
StreamsIn: 4,
Expand All @@ -191,11 +198,12 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
ResourceServiceErrors: 0,
})

ready = make(chan struct{})
ready := make(chan struct{})
echos[0].BeforeDone(waitForChannel(ready, time.Minute))

var once sync.Once
for i := 0; i < 3; i++ {
eg.Add(1)
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -210,6 +218,7 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
}()
}
wg.Wait()
eg.Wait()

checkEchoStatus(t, echos[0], EchoStatus{
StreamsIn: 7,
Expand All @@ -219,9 +228,12 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
})
}

func waitForBarrier(count *int32, ready chan struct{}, timeout time.Duration) func() error {
func waitForBarrier(count int32, timeout time.Duration) func() error {
ready := make(chan struct{})
wait := new(int32)
*wait = count
return func() error {
if atomic.AddInt32(count, -1) == 0 {
if atomic.AddInt32(wait, -1) == 0 {
close(ready)
}

Expand Down

0 comments on commit f2fa0b6

Please sign in to comment.