Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in Backoff Filter When Grafting New Peers #474

Merged
merged 1 commit into from Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 11 additions & 5 deletions gossipsub.go
Expand Up @@ -1036,21 +1036,25 @@ func (gs *GossipSubRouter) Join(topic string) {

gmap, ok = gs.fanout[topic]
if ok {
backoff := gs.backoff[topic]
// these peers have a score above the publish threshold, which may be negative
// so drop the ones with a negative score
for p := range gmap {
if gs.score.Score(p) < 0 {
_, doBackOff := backoff[p]
if gs.score.Score(p) < 0 || doBackOff {
delete(gmap, p)
}
}

if len(gmap) < gs.params.D {
// we need more peers; eager, as this would get fixed in the next heartbeat
more := gs.getPeers(topic, gs.params.D-len(gmap), func(p peer.ID) bool {
// filter our current peers, direct peers, and peers with negative scores
// filter our current peers, direct peers, peers we are backing off, and
// peers with negative scores
_, inMesh := gmap[p]
_, direct := gs.direct[p]
return !inMesh && !direct && gs.score.Score(p) >= 0
_, doBackOff := backoff[p]
return !inMesh && !direct && !doBackOff && gs.score.Score(p) >= 0
})
for _, p := range more {
gmap[p] = struct{}{}
Expand All @@ -1060,10 +1064,12 @@ func (gs *GossipSubRouter) Join(topic string) {
delete(gs.fanout, topic)
delete(gs.lastpub, topic)
} else {
backoff := gs.backoff[topic]
peers := gs.getPeers(topic, gs.params.D, func(p peer.ID) bool {
// filter direct peers and peers with negative score
// filter direct peers, peers we are backing off and peers with negative score
_, direct := gs.direct[p]
return !direct && gs.score.Score(p) >= 0
_, doBackOff := backoff[p]
return !direct && !doBackOff && gs.score.Score(p) >= 0
})
gmap = peerListToMap(peers)
gs.mesh[topic] = gmap
Expand Down
45 changes: 45 additions & 0 deletions gossipsub_test.go
Expand Up @@ -1862,6 +1862,51 @@ func TestGossipSubLeaveTopic(t *testing.T) {
}
}

func TestGossipSubJoinTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h := getNetHosts(t, ctx, 3)
psubs := []*PubSub{
getGossipsub(ctx, h[0]),
getGossipsub(ctx, h[1]),
getGossipsub(ctx, h[2]),
}

connect(t, h[0], h[1])
connect(t, h[0], h[2])

router0 := psubs[0].rt.(*GossipSubRouter)

// Add in backoff for peer.
peerMap := make(map[peer.ID]time.Time)
peerMap[h[1].ID()] = time.Now().Add(router0.params.PruneBackoff)

router0.backoff["test"] = peerMap

// Join all peers
var subs []*Subscription
for _, ps := range psubs {
sub, err := ps.Subscribe("test")
if err != nil {
t.Fatal(err)
}
subs = append(subs, sub)
}

time.Sleep(time.Second)

meshMap := router0.mesh["test"]
if len(meshMap) != 1 {
t.Fatalf("Unexpect peer included in the mesh")
}

_, ok := meshMap[h[1].ID()]
if ok {
t.Fatalf("Peer that was to be backed off is included in the mesh")
}
}

type sybilSquatter struct {
h host.Host
}
Expand Down