Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds exponential backoff to re-spawing new streams for supposedly dead peers #483

Merged
merged 20 commits into from May 30, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
@@ -1,3 +1,5 @@
cover.out
prof.out
go-floodsub.test

.idea/
103 changes: 103 additions & 0 deletions backoff.go
@@ -0,0 +1,103 @@
package pubsub

import (
"context"
"math/rand"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/peer"
)

const (
MinBackoffDelay = 100 * time.Millisecond
MaxBackoffDelay = 10 * time.Second
TimeToLive = 10 * time.Minute
BackoffCleanupInterval = 1 * time.Minute
BackoffMultiplier = 2
MaxBackoffJitterCoff = 100
MaxBackoffAttempts = 4
)

type backoffHistory struct {
duration time.Duration
lastTried time.Time
attempts int
}

type backoff struct {
mu sync.Mutex
info map[peer.ID]*backoffHistory
ct int // size threshold that kicks off the cleaner
ci time.Duration // cleanup intervals
maxAttempts int // maximum backoff attempts prior to ejection
}

func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Duration, maxAttempts int) *backoff {
b := &backoff{
mu: sync.Mutex{},
ct: sizeThreshold,
ci: cleanupInterval,
maxAttempts: maxAttempts,
info: make(map[peer.ID]*backoffHistory),
}

rand.Seed(time.Now().UnixNano()) // used for jitter
go b.cleanupLoop(ctx)

return b
}

func (b *backoff) updateAndGet(id peer.ID) (time.Duration, bool) {
b.mu.Lock()
defer b.mu.Unlock()

h, ok := b.info[id]
if !ok || time.Since(h.lastTried) > TimeToLive {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's write this if/else sequence with a switch, will be nicer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// first request goes immediately.
h = &backoffHistory{
duration: time.Duration(0),
attempts: 0,
}
} else if h.duration < MinBackoffDelay {
h.duration = MinBackoffDelay
} else if h.duration < MaxBackoffDelay {
jitter := rand.Intn(MaxBackoffJitterCoff)
h.duration = (BackoffMultiplier * h.duration) + time.Duration(jitter)*time.Millisecond
if h.duration > MaxBackoffDelay || h.duration < 0 {
h.duration = MaxBackoffDelay
}
}

h.lastTried = time.Now()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's get the time after checking the max attempts, will avoid the gettimeofday call in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please read my reply to the below comment as this part has got changed.

h.attempts += 1
if h.attempts > b.maxAttempts {
delete(b.info, id)
return 0, false
}

b.info[id] = h
return h.duration, true
}

func (b *backoff) cleanup() {
b.mu.Lock()
defer b.mu.Unlock()

for id, h := range b.info {
if time.Since(h.lastTried) > TimeToLive {
delete(b.info, id)
}
}
}

func (b *backoff) cleanupLoop(ctx context.Context) {
vyzo marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-ctx.Done():
return // pubsub shutting down
case <-time.Tick(b.ci):
b.cleanup()
}
}
}
124 changes: 124 additions & 0 deletions backoff_test.go
@@ -0,0 +1,124 @@
package pubsub

import (
"context"
"fmt"
"math"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/peer"
)

func TestBackoff_Update(t *testing.T) {
id1 := peer.ID("peer-1")
id2 := peer.ID("peer-2")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

size := 10
cleanupInterval := 5 * time.Second
maxBackoffAttempts := 10

b := newBackoff(ctx, size, cleanupInterval, maxBackoffAttempts)

if len(b.info) > 0 {
t.Fatal("non-empty info map for backoff")
}

if d, valid := b.updateAndGet(id1); d != time.Duration(0) || !valid {
t.Fatalf("invalid initialization: %v", d)
}
if d, valid := b.updateAndGet(id2); d != time.Duration(0) || !valid {
t.Fatalf("invalid initialization: %v", d)
}

for i := 0; i < maxBackoffAttempts-1; i++ {
got, valid := b.updateAndGet(id1)

if !valid {
t.Fatalf("update attempt invalidated")
}

expected := time.Duration(math.Pow(BackoffMultiplier, float64(i)) *
float64(MinBackoffDelay+MaxBackoffJitterCoff*time.Millisecond))
if expected > MaxBackoffDelay {
expected = MaxBackoffDelay
}

if expected < got { // considering jitter, expected backoff must always be greater than or equal to actual.
t.Fatalf("invalid backoff result, expected: %v, got: %v", expected, got)
}
}

if len(b.info) != 2 {
t.Fatalf("pre-invalidation attempt, info map size mismatch, expected: %d, got: %d", 2, len(b.info))
}

// trying once more beyond the threshold, hence expecting an invalidation
if _, valid := b.updateAndGet(id1); valid {
t.Fatal("update beyond max attempts did not invalidate")
}

// invalidated entry must be removed
if len(b.info) != 1 {
t.Fatalf("post-invalidation attempt, info map size mismatch, expected: %d, got: %d", 1, len(b.info))
}

got, valid := b.updateAndGet(id2)
if !valid {
t.Fatalf("update attempt invalidated")
}
if got != MinBackoffDelay {
t.Fatalf("invalid backoff result, expected: %v, got: %v", MinBackoffDelay, got)
}

// sets last tried of id2 to long ago that it resets back upon next try.
b.info[id2].lastTried = time.Now().Add(-TimeToLive)
got, valid = b.updateAndGet(id2)
if !valid {
t.Fatalf("update attempt invalidated")
}
if got != time.Duration(0) {
t.Fatalf("invalid ttl expiration, expected: %v, got: %v", time.Duration(0), got)
}

}

func TestBackoff_Clean(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

size := 10
cleanupInterval := 2 * time.Second
maxBackoffAttempts := 100 // setting attempts to a high number hence testing cleanup logic.
b := newBackoff(ctx, size, cleanupInterval, maxBackoffAttempts)

for i := 0; i < size; i++ {
id := peer.ID(fmt.Sprintf("peer-%d", i))
b.updateAndGet(id)
b.info[id].lastTried = time.Now().Add(-TimeToLive) // enforces expiry
}

if len(b.info) != size {
t.Fatalf("info map size mismatch, expected: %d, got: %d", size, len(b.info))
}

// waits for a cleanup loop to kick-in
time.Sleep(2 * cleanupInterval)

// next update should trigger cleanup
got, valid := b.updateAndGet(peer.ID("some-new-peer"))
if !valid {
t.Fatalf("update attempt invalidated")
}
if got != time.Duration(0) {
t.Fatalf("invalid backoff result, expected: %v, got: %v", time.Duration(0), got)
}

// except "some-new-peer" every other records must be cleaned up
if len(b.info) != 1 {
t.Fatalf("info map size mismatch, expected: %d, got: %d", 1, len(b.info))
}
}
18 changes: 18 additions & 0 deletions comm.go
Expand Up @@ -3,7 +3,9 @@ package pubsub
import (
"bufio"
"context"
"fmt"
"io"
"time"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -121,6 +123,22 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
}
}

func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) error {
delay, valid := p.deadPeerBackoff.updateAndGet(pid)
if !valid {
return fmt.Errorf("backoff attempts to %s expired after reaching maximum allowed", pid)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's return the error directly from updateAndGet instead of a bool, makes for simpler code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decoupled updating the backoff history from checking for the number of backoff attempts. Hence now updateAndGet only increments the backoff attempts without returning any boolean or error indicating the maximum attempts reached. Instead, I introduced a separate peerExceededBackoffThreshold method that checks whether the backoff attempts of a peer exceeds the defined threshold. The reasons for this change are:

  1. We want to close the channel and forget the peer if the backoff attempts go beyond a threshold. The updateAndGet is called on a goroutine while the messages channel and peers map are residing on a separate goroutine. So, if we let updateAndGet return a backoff attempt exceeding error and the goroutine that it resides on attempts on closing the messages channel and forgetting peer from peers map, it results in a race condition between the two goroutines, and also the code is vulnerable to panic when this defer function is executed, as it is trying to close an already closed channel, i.e.,the messages channel that we closed on updateAndGet.

  2. By this decoupling, we invoke peerExceededBackoffThreshold on the parent goroutine, hence we give up on backing off if the peer exceeds the backoff threshold rightaway, without opening any channel, spawning any child goroutine for updateAndGet. Moreover, the peer is forgotten by the subsequent lines. Hence, no vulnerability for race conditions and less resource allocation-deallocations.

Please let me know how does it sound?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds better than what we had :)


select {
case <-time.After(delay):
p.handleNewPeer(ctx, pid, outgoing)
case <-ctx.Done():
return fmt.Errorf("context cancelled")
}

return nil
}

func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
pid := s.Conn().RemotePeer()
r := protoio.NewDelimitedReader(s, p.maxMessageSize)
Expand Down
11 changes: 10 additions & 1 deletion pubsub.go
Expand Up @@ -110,6 +110,8 @@ type PubSub struct {
peerDeadPrioLk sync.RWMutex
peerDeadMx sync.Mutex
peerDeadPend map[peer.ID]struct{}
// backoff for retrying new connections to dead peers
deadPeerBackoff *backoff

// The set of topics we are subscribed to
mySubs map[string]map[*Subscription]struct{}
Expand Down Expand Up @@ -252,6 +254,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
newPeerError: make(chan peer.ID),
peerDead: make(chan struct{}, 1),
peerDeadPend: make(map[peer.ID]struct{}),
deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
Expand Down Expand Up @@ -686,7 +689,13 @@ func (p *PubSub) handleDeadPeers() {
log.Debugf("peer declared dead but still connected; respawning writer: %s", pid)
messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
go p.handleNewPeer(p.ctx, pid, messages)
go func() {
err := p.handleNewPeerWithBackoff(p.ctx, pid, messages)
if err != nil {
log.Warnf("could not handle backoff to new peer %s", err)
close(messages)
}
}()
p.peers[pid] = messages
continue
}
Expand Down