diff --git a/gossipsub.go b/gossipsub.go index 2e92d35c..ffdfa7dd 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -1036,10 +1036,12 @@ func (gs *GossipSubRouter) Join(topic string) { gmap, ok = gs.fanout[topic] if ok { + backoff := gs.backoff[topic] // these peers have a score above the publish threshold, which may be negative // so drop the ones with a negative score for p := range gmap { - if gs.score.Score(p) < 0 { + _, doBackOff := backoff[p] + if gs.score.Score(p) < 0 || doBackOff { delete(gmap, p) } } @@ -1047,10 +1049,12 @@ func (gs *GossipSubRouter) Join(topic string) { if len(gmap) < gs.params.D { // we need more peers; eager, as this would get fixed in the next heartbeat more := gs.getPeers(topic, gs.params.D-len(gmap), func(p peer.ID) bool { - // filter our current peers, direct peers, and peers with negative scores + // filter our current peers, direct peers, peers we are backing off, and + // peers with negative scores _, inMesh := gmap[p] _, direct := gs.direct[p] - return !inMesh && !direct && gs.score.Score(p) >= 0 + _, doBackOff := backoff[p] + return !inMesh && !direct && !doBackOff && gs.score.Score(p) >= 0 }) for _, p := range more { gmap[p] = struct{}{} @@ -1060,10 +1064,12 @@ func (gs *GossipSubRouter) Join(topic string) { delete(gs.fanout, topic) delete(gs.lastpub, topic) } else { + backoff := gs.backoff[topic] peers := gs.getPeers(topic, gs.params.D, func(p peer.ID) bool { - // filter direct peers and peers with negative score + // filter direct peers, peers we are backing off and peers with negative score _, direct := gs.direct[p] - return !direct && gs.score.Score(p) >= 0 + _, doBackOff := backoff[p] + return !direct && !doBackOff && gs.score.Score(p) >= 0 }) gmap = peerListToMap(peers) gs.mesh[topic] = gmap diff --git a/gossipsub_test.go b/gossipsub_test.go index bd0aa570..96e822e0 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -1862,6 +1862,51 @@ func TestGossipSubLeaveTopic(t *testing.T) { } } +func TestGossipSubJoinTopic(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h := getNetHosts(t, ctx, 3) + psubs := []*PubSub{ + getGossipsub(ctx, h[0]), + getGossipsub(ctx, h[1]), + getGossipsub(ctx, h[2]), + } + + connect(t, h[0], h[1]) + connect(t, h[0], h[2]) + + router0 := psubs[0].rt.(*GossipSubRouter) + + // Add in backoff for peer. + peerMap := make(map[peer.ID]time.Time) + peerMap[h[1].ID()] = time.Now().Add(router0.params.PruneBackoff) + + router0.backoff["test"] = peerMap + + // Join all peers + var subs []*Subscription + for _, ps := range psubs { + sub, err := ps.Subscribe("test") + if err != nil { + t.Fatal(err) + } + subs = append(subs, sub) + } + + time.Sleep(time.Second) + + meshMap := router0.mesh["test"] + if len(meshMap) != 1 { + t.Fatalf("Unexpect peer included in the mesh") + } + + _, ok := meshMap[h[1].ID()] + if ok { + t.Fatalf("Peer that was to be backed off is included in the mesh") + } +} + type sybilSquatter struct { h host.Host }