Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gossipsub: Unsubscribe backoff #488

Merged
merged 6 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 30 additions & 15 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
GossipSubFanoutTTL = 60 * time.Second
GossipSubPrunePeers = 16
GossipSubPruneBackoff = time.Minute
GossipSubUnsubscribeBackoff = 10 * time.Second
GossipSubConnectors = 8
GossipSubMaxPendingConnections = 128
GossipSubConnectionTimeout = 30 * time.Second
Expand Down Expand Up @@ -153,6 +154,11 @@ type GossipSubParams struct {
// before attempting to re-graft.
PruneBackoff time.Duration

// UnsubscribeBackoff controls the backoff time to use when unsuscribing
// from a topic. A peer should not resubscribe to this topic before this
// duration.
UnsubscribeBackoff time.Duration

// Connectors controls the number of active connection attempts for peers obtained through PX.
Connectors int

Expand Down Expand Up @@ -244,6 +250,7 @@ func DefaultGossipSubParams() GossipSubParams {
FanoutTTL: GossipSubFanoutTTL,
PrunePeers: GossipSubPrunePeers,
PruneBackoff: GossipSubPruneBackoff,
UnsubscribeBackoff: GossipSubUnsubscribeBackoff,
Connectors: GossipSubConnectors,
MaxPendingConnections: GossipSubMaxPendingConnections,
ConnectionTimeout: GossipSubConnectionTimeout,
Expand Down Expand Up @@ -777,7 +784,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
gs.score.AddPenalty(p, 1)
}
// refresh the backoff
gs.addBackoff(p, topic)
gs.addBackoff(p, topic, false)
prune = append(prune, topic)
continue
}
Expand All @@ -791,7 +798,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
// but we won't PX to them
doPX = false
// add/refresh backoff so that we don't reGRAFT too early even if the score decays back up
gs.addBackoff(p, topic)
gs.addBackoff(p, topic, false)
continue
}

Expand All @@ -800,7 +807,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
// mesh takeover attacks combined with love bombing
if len(peers) >= gs.params.Dhi && !gs.outbound[p] {
prune = append(prune, topic)
gs.addBackoff(p, topic)
gs.addBackoff(p, topic, false)
continue
}

Expand All @@ -815,7 +822,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.

cprune := make([]*pb.ControlPrune, 0, len(prune))
for _, topic := range prune {
cprune = append(cprune, gs.makePrune(p, topic, doPX))
cprune = append(cprune, gs.makePrune(p, topic, doPX, false))
}

return cprune
Expand All @@ -839,7 +846,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
if backoff > 0 {
gs.doAddBackoff(p, topic, time.Duration(backoff)*time.Second)
} else {
gs.addBackoff(p, topic)
gs.addBackoff(p, topic, false)
}

px := prune.GetPeers()
Expand All @@ -855,8 +862,12 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
}
}

func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) {
gs.doAddBackoff(p, topic, gs.params.PruneBackoff)
func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string, isUnsubscribe bool) {
backoff := gs.params.PruneBackoff
if isUnsubscribe {
backoff = gs.params.UnsubscribeBackoff
}
gs.doAddBackoff(p, topic, backoff)
}

func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.Duration) {
Expand Down Expand Up @@ -1096,11 +1107,11 @@ func (gs *GossipSubRouter) Leave(topic string) {
for p := range gmap {
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
gs.sendPrune(p, topic)
gs.sendPrune(p, topic, true)
// Add a backoff to this peer to prevent us from eagerly
// re-grafting this peer into our mesh if we rejoin this
// topic before the backoff period ends.
gs.addBackoff(p, topic)
gs.addBackoff(p, topic, true)
}
}

Expand All @@ -1110,8 +1121,8 @@ func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
gs.sendRPC(p, out)
}

func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) {
prune := []*pb.ControlPrune{gs.makePrune(p, topic, gs.doPX)}
func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string, isUnsubscribe bool) {
prune := []*pb.ControlPrune{gs.makePrune(p, topic, gs.doPX, isUnsubscribe)}
out := rpcWithControl(nil, nil, nil, nil, prune)
gs.sendRPC(p, out)
}
Expand Down Expand Up @@ -1368,7 +1379,7 @@ func (gs *GossipSubRouter) heartbeat() {
prunePeer := func(p peer.ID) {
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.addBackoff(p, topic)
gs.addBackoff(p, topic, false)
topics := toprune[p]
toprune[p] = append(topics, topic)
}
Expand Down Expand Up @@ -1668,7 +1679,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string,
delete(toprune, p)
prune = make([]*pb.ControlPrune, 0, len(pruning))
for _, topic := range pruning {
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p]))
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p], false))
}
}

Expand All @@ -1679,7 +1690,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string,
for p, topics := range toprune {
prune := make([]*pb.ControlPrune, 0, len(topics))
for _, topic := range topics {
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p]))
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p], false))
}

out := rpcWithControl(nil, nil, nil, nil, prune)
Expand Down Expand Up @@ -1834,13 +1845,17 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control
}
}

func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.ControlPrune {
func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool, isUnsubscribe bool) *pb.ControlPrune {
if !gs.feature(GossipSubFeaturePX, gs.peers[p]) {
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
return &pb.ControlPrune{TopicID: &topic}
}

backoff := uint64(gs.params.PruneBackoff / time.Second)
if isUnsubscribe {
backoff = uint64(gs.params.UnsubscribeBackoff / time.Second)
}

var px []*pb.PeerInfo
if doPX {
// select peers for Peer eXchange
Expand Down
157 changes: 140 additions & 17 deletions gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -581,6 +582,104 @@ func TestGossipsubPrune(t *testing.T) {
}
}

func TestGossipsubPruneBackoffTime(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)

// App specific score that we'll change later.
currentScoreForHost0 := int32(0)

params := DefaultGossipSubParams()
params.HeartbeatInitialDelay = time.Millisecond * 10
params.HeartbeatInterval = time.Millisecond * 100

psubs := getGossipsubs(ctx, hosts, WithGossipSubParams(params), WithPeerScore(
&PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 {
if p == hosts[0].ID() {
return float64(atomic.LoadInt32(&currentScoreForHost0))
} else {
return 0
}
},
AppSpecificWeight: 1,
DecayInterval: time.Second,
DecayToZero: 0.01,
},
&PeerScoreThresholds{
GossipThreshold: -1,
PublishThreshold: -1,
GraylistThreshold: -1,
}))

var msgs []*Subscription
for _, ps := range psubs {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs = append(msgs, subch)
}

connectAll(t, hosts)

// wait for heartbeats to build mesh
time.Sleep(time.Second)

pruneTime := time.Now()
// Flip the score. Host 0 should be pruned from everyone
atomic.StoreInt32(&currentScoreForHost0, -1000)

// wait for heartbeats to run and prune
time.Sleep(time.Second)

wg := sync.WaitGroup{}
var missingBackoffs uint32 = 0
for i := 1; i < 10; i++ {
wg.Add(1)
// Copy i so this func keeps the correct value in the closure.
var idx = i
// Run this check in the eval thunk so that we don't step over the heartbeat goroutine and trigger a race.
psubs[idx].rt.(*GossipSubRouter).p.eval <- func() {
defer wg.Done()
backoff, ok := psubs[idx].rt.(*GossipSubRouter).backoff["foobar"][hosts[0].ID()]
if !ok {
atomic.AddUint32(&missingBackoffs, 1)
}
if ok && backoff.Sub(pruneTime)-params.PruneBackoff > time.Second {
t.Errorf("backoff time should be equal to prune backoff (with some slack) was %v", backoff.Sub(pruneTime)-params.PruneBackoff)
}
}
}
wg.Wait()

// Sometimes not all the peers will have updated their backoffs by this point. If the majority haven't we'll fail this test.
if missingBackoffs >= 5 {
t.Errorf("missing too many backoffs: %v", missingBackoffs)
}

for i := 0; i < 10; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))

// Don't publish from host 0, since everyone should have pruned it.
owner := rand.Intn(len(psubs)-1) + 1

psubs[owner].Publish("foobar", msg)

for _, sub := range msgs[1:] {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
}
}
}

func TestGossipsubGraft(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -1839,27 +1938,51 @@ func TestGossipSubLeaveTopic(t *testing.T) {

time.Sleep(time.Second)

psubs[0].rt.Leave("test")
time.Sleep(time.Second)
peerMap := psubs[0].rt.(*GossipSubRouter).backoff["test"]
if len(peerMap) != 1 {
t.Fatalf("No peer is populated in the backoff map for peer 0")
}
_, ok := peerMap[h[1].ID()]
if !ok {
t.Errorf("Expected peer does not exist in the backoff map")
leaveTime := time.Now()
done := make(chan struct{})

psubs[0].rt.(*GossipSubRouter).p.eval <- func() {
defer close(done)
psubs[0].rt.Leave("test")
time.Sleep(time.Second)
peerMap := psubs[0].rt.(*GossipSubRouter).backoff["test"]
if len(peerMap) != 1 {
t.Fatalf("No peer is populated in the backoff map for peer 0")
}
_, ok := peerMap[h[1].ID()]
if !ok {
t.Errorf("Expected peer does not exist in the backoff map")
}

backoffTime := peerMap[h[1].ID()].Sub(leaveTime)
// Check that the backoff time is roughly the unsubscribebackoff time (with a slack of 1s)
if backoffTime-GossipSubUnsubscribeBackoff > time.Second {
t.Error("Backoff time should be set to GossipSubUnsubscribeBackoff.")
}
}
<-done

done = make(chan struct{})
// Ensure that remote peer 1 also applies the backoff appropriately
// for peer 0.
peerMap2 := psubs[1].rt.(*GossipSubRouter).backoff["test"]
if len(peerMap2) != 1 {
t.Fatalf("No peer is populated in the backoff map for peer 1")
}
_, ok = peerMap2[h[0].ID()]
if !ok {
t.Errorf("Expected peer does not exist in the backoff map")
psubs[1].rt.(*GossipSubRouter).p.eval <- func() {
defer close(done)
peerMap2 := psubs[1].rt.(*GossipSubRouter).backoff["test"]
if len(peerMap2) != 1 {
t.Fatalf("No peer is populated in the backoff map for peer 1")
}
_, ok := peerMap2[h[0].ID()]
if !ok {
t.Errorf("Expected peer does not exist in the backoff map")
}

backoffTime := peerMap2[h[0].ID()].Sub(leaveTime)
// Check that the backoff time is roughly the unsubscribebackoff time (with a slack of 1s)
if backoffTime-GossipSubUnsubscribeBackoff > time.Second {
t.Error("Backoff time should be set to GossipSubUnsubscribeBackoff.")
}
}
<-done
}

func TestGossipSubJoinTopic(t *testing.T) {
Expand All @@ -1880,7 +2003,7 @@ func TestGossipSubJoinTopic(t *testing.T) {

// Add in backoff for peer.
peerMap := make(map[peer.ID]time.Time)
peerMap[h[1].ID()] = time.Now().Add(router0.params.PruneBackoff)
peerMap[h[1].ID()] = time.Now().Add(router0.params.UnsubscribeBackoff)

router0.backoff["test"] = peerMap

Expand Down