Skip to content

Commit

Permalink
Gossipsub: Unsubscribe backoff (#488)
Browse files Browse the repository at this point in the history
* Implement Unsusbcribe backoff

* Add test to check that prune backoff time is used

* Update which backoff to use in TestGossibSubJoinTopic test

* Fix race in TestGossipSubLeaveTopic

* Wait for all the backoff checks, and check that we aren't missing too many

* Remove open question
  • Loading branch information
MarcoPolo committed Jun 3, 2022
1 parent 06b5ba4 commit 68cdae0
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 32 deletions.
45 changes: 30 additions & 15 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
GossipSubFanoutTTL = 60 * time.Second
GossipSubPrunePeers = 16
GossipSubPruneBackoff = time.Minute
GossipSubUnsubscribeBackoff = 10 * time.Second
GossipSubConnectors = 8
GossipSubMaxPendingConnections = 128
GossipSubConnectionTimeout = 30 * time.Second
Expand Down Expand Up @@ -153,6 +154,11 @@ type GossipSubParams struct {
// before attempting to re-graft.
PruneBackoff time.Duration

// UnsubscribeBackoff controls the backoff time to use when unsuscribing
// from a topic. A peer should not resubscribe to this topic before this
// duration.
UnsubscribeBackoff time.Duration

// Connectors controls the number of active connection attempts for peers obtained through PX.
Connectors int

Expand Down Expand Up @@ -244,6 +250,7 @@ func DefaultGossipSubParams() GossipSubParams {
FanoutTTL: GossipSubFanoutTTL,
PrunePeers: GossipSubPrunePeers,
PruneBackoff: GossipSubPruneBackoff,
UnsubscribeBackoff: GossipSubUnsubscribeBackoff,
Connectors: GossipSubConnectors,
MaxPendingConnections: GossipSubMaxPendingConnections,
ConnectionTimeout: GossipSubConnectionTimeout,
Expand Down Expand Up @@ -777,7 +784,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
gs.score.AddPenalty(p, 1)
}
// refresh the backoff
gs.addBackoff(p, topic)
gs.addBackoff(p, topic, false)
prune = append(prune, topic)
continue
}
Expand All @@ -791,7 +798,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
// but we won't PX to them
doPX = false
// add/refresh backoff so that we don't reGRAFT too early even if the score decays back up
gs.addBackoff(p, topic)
gs.addBackoff(p, topic, false)
continue
}

Expand All @@ -800,7 +807,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
// mesh takeover attacks combined with love bombing
if len(peers) >= gs.params.Dhi && !gs.outbound[p] {
prune = append(prune, topic)
gs.addBackoff(p, topic)
gs.addBackoff(p, topic, false)
continue
}

Expand All @@ -815,7 +822,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.

cprune := make([]*pb.ControlPrune, 0, len(prune))
for _, topic := range prune {
cprune = append(cprune, gs.makePrune(p, topic, doPX))
cprune = append(cprune, gs.makePrune(p, topic, doPX, false))
}

return cprune
Expand All @@ -839,7 +846,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
if backoff > 0 {
gs.doAddBackoff(p, topic, time.Duration(backoff)*time.Second)
} else {
gs.addBackoff(p, topic)
gs.addBackoff(p, topic, false)
}

px := prune.GetPeers()
Expand All @@ -855,8 +862,12 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
}
}

func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) {
gs.doAddBackoff(p, topic, gs.params.PruneBackoff)
func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string, isUnsubscribe bool) {
backoff := gs.params.PruneBackoff
if isUnsubscribe {
backoff = gs.params.UnsubscribeBackoff
}
gs.doAddBackoff(p, topic, backoff)
}

func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.Duration) {
Expand Down Expand Up @@ -1096,11 +1107,11 @@ func (gs *GossipSubRouter) Leave(topic string) {
for p := range gmap {
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
gs.sendPrune(p, topic)
gs.sendPrune(p, topic, true)
// Add a backoff to this peer to prevent us from eagerly
// re-grafting this peer into our mesh if we rejoin this
// topic before the backoff period ends.
gs.addBackoff(p, topic)
gs.addBackoff(p, topic, true)
}
}

Expand All @@ -1110,8 +1121,8 @@ func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
gs.sendRPC(p, out)
}

func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) {
prune := []*pb.ControlPrune{gs.makePrune(p, topic, gs.doPX)}
func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string, isUnsubscribe bool) {
prune := []*pb.ControlPrune{gs.makePrune(p, topic, gs.doPX, isUnsubscribe)}
out := rpcWithControl(nil, nil, nil, nil, prune)
gs.sendRPC(p, out)
}
Expand Down Expand Up @@ -1368,7 +1379,7 @@ func (gs *GossipSubRouter) heartbeat() {
prunePeer := func(p peer.ID) {
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.addBackoff(p, topic)
gs.addBackoff(p, topic, false)
topics := toprune[p]
toprune[p] = append(topics, topic)
}
Expand Down Expand Up @@ -1668,7 +1679,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string,
delete(toprune, p)
prune = make([]*pb.ControlPrune, 0, len(pruning))
for _, topic := range pruning {
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p]))
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p], false))
}
}

Expand All @@ -1679,7 +1690,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string,
for p, topics := range toprune {
prune := make([]*pb.ControlPrune, 0, len(topics))
for _, topic := range topics {
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p]))
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p], false))
}

out := rpcWithControl(nil, nil, nil, nil, prune)
Expand Down Expand Up @@ -1834,13 +1845,17 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control
}
}

func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.ControlPrune {
func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool, isUnsubscribe bool) *pb.ControlPrune {
if !gs.feature(GossipSubFeaturePX, gs.peers[p]) {
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
return &pb.ControlPrune{TopicID: &topic}
}

backoff := uint64(gs.params.PruneBackoff / time.Second)
if isUnsubscribe {
backoff = uint64(gs.params.UnsubscribeBackoff / time.Second)
}

var px []*pb.PeerInfo
if doPX {
// select peers for Peer eXchange
Expand Down
157 changes: 140 additions & 17 deletions gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -581,6 +582,104 @@ func TestGossipsubPrune(t *testing.T) {
}
}

func TestGossipsubPruneBackoffTime(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)

// App specific score that we'll change later.
currentScoreForHost0 := int32(0)

params := DefaultGossipSubParams()
params.HeartbeatInitialDelay = time.Millisecond * 10
params.HeartbeatInterval = time.Millisecond * 100

psubs := getGossipsubs(ctx, hosts, WithGossipSubParams(params), WithPeerScore(
&PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 {
if p == hosts[0].ID() {
return float64(atomic.LoadInt32(&currentScoreForHost0))
} else {
return 0
}
},
AppSpecificWeight: 1,
DecayInterval: time.Second,
DecayToZero: 0.01,
},
&PeerScoreThresholds{
GossipThreshold: -1,
PublishThreshold: -1,
GraylistThreshold: -1,
}))

var msgs []*Subscription
for _, ps := range psubs {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs = append(msgs, subch)
}

connectAll(t, hosts)

// wait for heartbeats to build mesh
time.Sleep(time.Second)

pruneTime := time.Now()
// Flip the score. Host 0 should be pruned from everyone
atomic.StoreInt32(&currentScoreForHost0, -1000)

// wait for heartbeats to run and prune
time.Sleep(time.Second)

wg := sync.WaitGroup{}
var missingBackoffs uint32 = 0
for i := 1; i < 10; i++ {
wg.Add(1)
// Copy i so this func keeps the correct value in the closure.
var idx = i
// Run this check in the eval thunk so that we don't step over the heartbeat goroutine and trigger a race.
psubs[idx].rt.(*GossipSubRouter).p.eval <- func() {
defer wg.Done()
backoff, ok := psubs[idx].rt.(*GossipSubRouter).backoff["foobar"][hosts[0].ID()]
if !ok {
atomic.AddUint32(&missingBackoffs, 1)
}
if ok && backoff.Sub(pruneTime)-params.PruneBackoff > time.Second {
t.Errorf("backoff time should be equal to prune backoff (with some slack) was %v", backoff.Sub(pruneTime)-params.PruneBackoff)
}
}
}
wg.Wait()

// Sometimes not all the peers will have updated their backoffs by this point. If the majority haven't we'll fail this test.
if missingBackoffs >= 5 {
t.Errorf("missing too many backoffs: %v", missingBackoffs)
}

for i := 0; i < 10; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))

// Don't publish from host 0, since everyone should have pruned it.
owner := rand.Intn(len(psubs)-1) + 1

psubs[owner].Publish("foobar", msg)

for _, sub := range msgs[1:] {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
}
}
}

func TestGossipsubGraft(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -1839,27 +1938,51 @@ func TestGossipSubLeaveTopic(t *testing.T) {

time.Sleep(time.Second)

psubs[0].rt.Leave("test")
time.Sleep(time.Second)
peerMap := psubs[0].rt.(*GossipSubRouter).backoff["test"]
if len(peerMap) != 1 {
t.Fatalf("No peer is populated in the backoff map for peer 0")
}
_, ok := peerMap[h[1].ID()]
if !ok {
t.Errorf("Expected peer does not exist in the backoff map")
leaveTime := time.Now()
done := make(chan struct{})

psubs[0].rt.(*GossipSubRouter).p.eval <- func() {
defer close(done)
psubs[0].rt.Leave("test")
time.Sleep(time.Second)
peerMap := psubs[0].rt.(*GossipSubRouter).backoff["test"]
if len(peerMap) != 1 {
t.Fatalf("No peer is populated in the backoff map for peer 0")
}
_, ok := peerMap[h[1].ID()]
if !ok {
t.Errorf("Expected peer does not exist in the backoff map")
}

backoffTime := peerMap[h[1].ID()].Sub(leaveTime)
// Check that the backoff time is roughly the unsubscribebackoff time (with a slack of 1s)
if backoffTime-GossipSubUnsubscribeBackoff > time.Second {
t.Error("Backoff time should be set to GossipSubUnsubscribeBackoff.")
}
}
<-done

done = make(chan struct{})
// Ensure that remote peer 1 also applies the backoff appropriately
// for peer 0.
peerMap2 := psubs[1].rt.(*GossipSubRouter).backoff["test"]
if len(peerMap2) != 1 {
t.Fatalf("No peer is populated in the backoff map for peer 1")
}
_, ok = peerMap2[h[0].ID()]
if !ok {
t.Errorf("Expected peer does not exist in the backoff map")
psubs[1].rt.(*GossipSubRouter).p.eval <- func() {
defer close(done)
peerMap2 := psubs[1].rt.(*GossipSubRouter).backoff["test"]
if len(peerMap2) != 1 {
t.Fatalf("No peer is populated in the backoff map for peer 1")
}
_, ok := peerMap2[h[0].ID()]
if !ok {
t.Errorf("Expected peer does not exist in the backoff map")
}

backoffTime := peerMap2[h[0].ID()].Sub(leaveTime)
// Check that the backoff time is roughly the unsubscribebackoff time (with a slack of 1s)
if backoffTime-GossipSubUnsubscribeBackoff > time.Second {
t.Error("Backoff time should be set to GossipSubUnsubscribeBackoff.")
}
}
<-done
}

func TestGossipSubJoinTopic(t *testing.T) {
Expand All @@ -1880,7 +2003,7 @@ func TestGossipSubJoinTopic(t *testing.T) {

// Add in backoff for peer.
peerMap := make(map[peer.ID]time.Time)
peerMap[h[1].ID()] = time.Now().Add(router0.params.PruneBackoff)
peerMap[h[1].ID()] = time.Now().Add(router0.params.UnsubscribeBackoff)

router0.backoff["test"] = peerMap

Expand Down

0 comments on commit 68cdae0

Please sign in to comment.