diff --git a/.gitignore b/.gitignore index 5b98de78..8707669a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ cover.out prof.out go-floodsub.test + +.idea/ diff --git a/backoff.go b/backoff.go new file mode 100644 index 00000000..1a1b6ecf --- /dev/null +++ b/backoff.go @@ -0,0 +1,107 @@ +package pubsub + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/libp2p/go-libp2p-core/peer" +) + +const ( + MinBackoffDelay = 100 * time.Millisecond + MaxBackoffDelay = 10 * time.Second + TimeToLive = 10 * time.Minute + BackoffCleanupInterval = 1 * time.Minute + BackoffMultiplier = 2 + MaxBackoffJitterCoff = 100 + MaxBackoffAttempts = 4 +) + +type backoffHistory struct { + duration time.Duration + lastTried time.Time + attempts int +} + +type backoff struct { + mu sync.Mutex + info map[peer.ID]*backoffHistory + ct int // size threshold that kicks off the cleaner + ci time.Duration // cleanup intervals + maxAttempts int // maximum backoff attempts prior to ejection +} + +func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Duration, maxAttempts int) *backoff { + b := &backoff{ + mu: sync.Mutex{}, + ct: sizeThreshold, + ci: cleanupInterval, + maxAttempts: maxAttempts, + info: make(map[peer.ID]*backoffHistory), + } + + rand.Seed(time.Now().UnixNano()) // used for jitter + go b.cleanupLoop(ctx) + + return b +} + +func (b *backoff) updateAndGet(id peer.ID) (time.Duration, error) { + b.mu.Lock() + defer b.mu.Unlock() + + h, ok := b.info[id] + switch { + case !ok || time.Since(h.lastTried) > TimeToLive: + // first request goes immediately. + h = &backoffHistory{ + duration: time.Duration(0), + attempts: 0, + } + case h.attempts >= b.maxAttempts: + return 0, fmt.Errorf("peer %s has reached its maximum backoff attempts", id) + + case h.duration < MinBackoffDelay: + h.duration = MinBackoffDelay + + case h.duration < MaxBackoffDelay: + jitter := rand.Intn(MaxBackoffJitterCoff) + h.duration = (BackoffMultiplier * h.duration) + time.Duration(jitter)*time.Millisecond + if h.duration > MaxBackoffDelay || h.duration < 0 { + h.duration = MaxBackoffDelay + } + } + + h.attempts += 1 + h.lastTried = time.Now() + b.info[id] = h + return h.duration, nil +} + +func (b *backoff) cleanup() { + b.mu.Lock() + defer b.mu.Unlock() + + for id, h := range b.info { + if time.Since(h.lastTried) > TimeToLive { + delete(b.info, id) + } + } +} + +func (b *backoff) cleanupLoop(ctx context.Context) { + ticker := time.NewTicker(b.ci) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return // pubsub shutting down + case <-ticker.C: + b.cleanup() + } + } +} diff --git a/backoff_test.go b/backoff_test.go new file mode 100644 index 00000000..5c72fcfa --- /dev/null +++ b/backoff_test.go @@ -0,0 +1,122 @@ +package pubsub + +import ( + "context" + "fmt" + "math" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/peer" +) + +func TestBackoff_Update(t *testing.T) { + id1 := peer.ID("peer-1") + id2 := peer.ID("peer-2") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + size := 10 + cleanupInterval := 5 * time.Second + maxBackoffAttempts := 10 + + b := newBackoff(ctx, size, cleanupInterval, maxBackoffAttempts) + + if len(b.info) > 0 { + t.Fatal("non-empty info map for backoff") + } + + if d, err := b.updateAndGet(id1); d != time.Duration(0) || err != nil { + t.Fatalf("invalid initialization: %v, \t, %s", d, err) + } + if d, err := b.updateAndGet(id2); d != time.Duration(0) || err != nil { + t.Fatalf("invalid initialization: %v, \t, %s", d, err) + } + + for i := 0; i < maxBackoffAttempts-1; i++ { + got, err := b.updateAndGet(id1) + if err != nil { + t.Fatalf("unexpected error post update: %s", err) + } + + expected := time.Duration(math.Pow(BackoffMultiplier, float64(i)) * + float64(MinBackoffDelay+MaxBackoffJitterCoff*time.Millisecond)) + if expected > MaxBackoffDelay { + expected = MaxBackoffDelay + } + + if expected < got { // considering jitter, expected backoff must always be greater than or equal to actual. + t.Fatalf("invalid backoff result, expected: %v, got: %v", expected, got) + } + } + + // trying once more beyond the threshold, hence expecting exceeding threshold + if _, err := b.updateAndGet(id1); err == nil { + t.Fatalf("expected an error for going beyond threshold but got nil") + } + + got, err := b.updateAndGet(id2) + if err != nil { + t.Fatalf("unexpected error post update: %s", err) + } + if got != MinBackoffDelay { + t.Fatalf("invalid backoff result, expected: %v, got: %v", MinBackoffDelay, got) + } + + // sets last tried of id2 to long ago that it resets back upon next try. + // update attempts on id2 are below threshold, hence peer should never go beyond backoff attempt threshold. + b.info[id2].lastTried = time.Now().Add(-TimeToLive) + got, err = b.updateAndGet(id2) + if err != nil { + t.Fatalf("unexpected error post update: %s", err) + } + if got != time.Duration(0) { + t.Fatalf("invalid ttl expiration, expected: %v, got: %v", time.Duration(0), got) + } + + if len(b.info) != 2 { + t.Fatalf("pre-invalidation attempt, info map size mismatch, expected: %d, got: %d", 2, len(b.info)) + } + +} + +func TestBackoff_Clean(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + size := 10 + cleanupInterval := 2 * time.Second + maxBackoffAttempts := 100 // setting attempts to a high number hence testing cleanup logic. + b := newBackoff(ctx, size, cleanupInterval, maxBackoffAttempts) + + for i := 0; i < size; i++ { + id := peer.ID(fmt.Sprintf("peer-%d", i)) + _, err := b.updateAndGet(id) + if err != nil { + t.Fatalf("unexpected error post update: %s", err) + } + b.info[id].lastTried = time.Now().Add(-TimeToLive) // enforces expiry + } + + if len(b.info) != size { + t.Fatalf("info map size mismatch, expected: %d, got: %d", size, len(b.info)) + } + + // waits for a cleanup loop to kick-in + time.Sleep(2 * cleanupInterval) + + // next update should trigger cleanup + got, err := b.updateAndGet(peer.ID("some-new-peer")) + if err != nil { + t.Fatalf("unexpected error post update: %s", err) + } + if got != time.Duration(0) { + t.Fatalf("invalid backoff result, expected: %v, got: %v", time.Duration(0), got) + } + + // except "some-new-peer" every other records must be cleaned up + if len(b.info) != 1 { + t.Fatalf("info map size mismatch, expected: %d, got: %d", 1, len(b.info)) + } +} diff --git a/comm.go b/comm.go index 5a9d0d85..a135a60a 100644 --- a/comm.go +++ b/comm.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "io" + "time" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -121,6 +122,15 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan } } +func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, backoff time.Duration, outgoing <-chan *RPC) { + select { + case <-time.After(backoff): + p.handleNewPeer(ctx, pid, outgoing) + case <-ctx.Done(): + return + } +} + func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) { pid := s.Conn().RemotePeer() r := protoio.NewDelimitedReader(s, p.maxMessageSize) diff --git a/pubsub.go b/pubsub.go index 901314b9..234142b1 100644 --- a/pubsub.go +++ b/pubsub.go @@ -110,6 +110,8 @@ type PubSub struct { peerDeadPrioLk sync.RWMutex peerDeadMx sync.Mutex peerDeadPend map[peer.ID]struct{} + // backoff for retrying new connections to dead peers + deadPeerBackoff *backoff // The set of topics we are subscribed to mySubs map[string]map[*Subscription]struct{} @@ -252,6 +254,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option newPeerError: make(chan peer.ID), peerDead: make(chan struct{}, 1), peerDeadPend: make(map[peer.ID]struct{}), + deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts), cancelCh: make(chan *Subscription), getPeers: make(chan *listPeerReq), addSub: make(chan *addSubReq), @@ -681,12 +684,18 @@ func (p *PubSub) handleDeadPeers() { close(ch) if p.host.Network().Connectedness(pid) == network.Connected { + backoffDelay, err := p.deadPeerBackoff.updateAndGet(pid) + if err != nil { + log.Debug(err) + continue + } + // still connected, must be a duplicate connection being closed. // we respawn the writer as we need to ensure there is a stream active log.Debugf("peer declared dead but still connected; respawning writer: %s", pid) messages := make(chan *RPC, p.peerOutboundQueueSize) messages <- p.getHelloPacket() - go p.handleNewPeer(p.ctx, pid, messages) + go p.handleNewPeerWithBackoff(p.ctx, pid, backoffDelay, messages) p.peers[pid] = messages continue }