Skip to content

Commit

Permalink
Add in Backoff Check
Browse files Browse the repository at this point in the history
  • Loading branch information
nisdas authored and vyzo committed Feb 8, 2022
1 parent e02b347 commit aeb30a2
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 5 deletions.
16 changes: 11 additions & 5 deletions gossipsub.go
Expand Up @@ -1036,21 +1036,25 @@ func (gs *GossipSubRouter) Join(topic string) {

gmap, ok = gs.fanout[topic]
if ok {
backoff := gs.backoff[topic]
// these peers have a score above the publish threshold, which may be negative
// so drop the ones with a negative score
for p := range gmap {
if gs.score.Score(p) < 0 {
_, doBackOff := backoff[p]
if gs.score.Score(p) < 0 || doBackOff {
delete(gmap, p)
}
}

if len(gmap) < gs.params.D {
// we need more peers; eager, as this would get fixed in the next heartbeat
more := gs.getPeers(topic, gs.params.D-len(gmap), func(p peer.ID) bool {
// filter our current peers, direct peers, and peers with negative scores
// filter our current peers, direct peers, peers we are backing off, and
// peers with negative scores
_, inMesh := gmap[p]
_, direct := gs.direct[p]
return !inMesh && !direct && gs.score.Score(p) >= 0
_, doBackOff := backoff[p]
return !inMesh && !direct && !doBackOff && gs.score.Score(p) >= 0
})
for _, p := range more {
gmap[p] = struct{}{}
Expand All @@ -1060,10 +1064,12 @@ func (gs *GossipSubRouter) Join(topic string) {
delete(gs.fanout, topic)
delete(gs.lastpub, topic)
} else {
backoff := gs.backoff[topic]
peers := gs.getPeers(topic, gs.params.D, func(p peer.ID) bool {
// filter direct peers and peers with negative score
// filter direct peers, peers we are backing off and peers with negative score
_, direct := gs.direct[p]
return !direct && gs.score.Score(p) >= 0
_, doBackOff := backoff[p]
return !direct && !doBackOff && gs.score.Score(p) >= 0
})
gmap = peerListToMap(peers)
gs.mesh[topic] = gmap
Expand Down
45 changes: 45 additions & 0 deletions gossipsub_test.go
Expand Up @@ -1862,6 +1862,51 @@ func TestGossipSubLeaveTopic(t *testing.T) {
}
}

func TestGossipSubJoinTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h := getNetHosts(t, ctx, 3)
psubs := []*PubSub{
getGossipsub(ctx, h[0]),
getGossipsub(ctx, h[1]),
getGossipsub(ctx, h[2]),
}

connect(t, h[0], h[1])
connect(t, h[0], h[2])

router0 := psubs[0].rt.(*GossipSubRouter)

// Add in backoff for peer.
peerMap := make(map[peer.ID]time.Time)
peerMap[h[1].ID()] = time.Now().Add(router0.params.PruneBackoff)

router0.backoff["test"] = peerMap

// Join all peers
var subs []*Subscription
for _, ps := range psubs {
sub, err := ps.Subscribe("test")
if err != nil {
t.Fatal(err)
}
subs = append(subs, sub)
}

time.Sleep(time.Second)

meshMap := router0.mesh["test"]
if len(meshMap) != 1 {
t.Fatalf("Unexpect peer included in the mesh")
}

_, ok := meshMap[h[1].ID()]
if ok {
t.Fatalf("Peer that was to be backed off is included in the mesh")
}
}

type sybilSquatter struct {
h host.Host
}
Expand Down

0 comments on commit aeb30a2

Please sign in to comment.