Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix tests on web3-bot/sync #416

Draft
wants to merge 9 commits into
base: web3-bot/sync
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 12 additions & 16 deletions discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,18 @@ func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) {

// Wait for network to finish forming then join the partitions via discovery
for _, ps := range psubs {
waitUntilGossipsubMeshCount(ps, topic, partitionSize-1)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this here becasuse waitUntilGossipsubMeshCount was reported unused by staticcheck (the test is skipped), and this was the only place where it was used. I'd be happy to move it back for clarity if needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just comment it out -- also isn't there any way to selectively disable the (imo worthless) tool?

done := false
doneCh := make(chan bool, 1)
rt := ps.rt.(*GossipSubRouter)
for !done {
ps.eval <- func() {
doneCh <- len(rt.mesh[topic]) == partitionSize-1
}
done = <-doneCh
if !done {
time.Sleep(100 * time.Millisecond)
}
}
}

for i := 0; i < partitionSize; i++ {
Expand Down Expand Up @@ -291,18 +302,3 @@ func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) {
}
}
}

func waitUntilGossipsubMeshCount(ps *PubSub, topic string, count int) {
done := false
doneCh := make(chan bool, 1)
rt := ps.rt.(*GossipSubRouter)
for !done {
ps.eval <- func() {
doneCh <- len(rt.mesh[topic]) == count
}
done = <-doneCh
if !done {
time.Sleep(100 * time.Millisecond)
}
}
}
2 changes: 0 additions & 2 deletions gossip_tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ func TestBrokenPromises(t *testing.T) {
peerB := peer.ID("B")
peerC := peer.ID("C")

var msgs []*pb.Message
var mids []string
for i := 0; i < 100; i++ {
m := makeTestMessage(i)
m.From = []byte(peerA)
msgs = append(msgs, m)
mid := DefaultMsgIdFn(m)
mids = append(mids, mid)
}
Expand Down
2 changes: 0 additions & 2 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,6 @@ func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) {
case gs.connect <- ci:
default:
log.Debugf("ignoring peer connection attempt; too many pending connections")
break
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

staticcheck reports this as an ineffective break, which I don't think is true. On the other hand, why not go through the rest of the list, maybe the channel buffer will be more free as you go?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's useless, it can be removed.

}
}
}
Expand Down Expand Up @@ -1240,7 +1239,6 @@ func (gs *GossipSubRouter) heartbeatTimer() {
}

func (gs *GossipSubRouter) heartbeat() {
defer log.EventBegin(gs.p.ctx, "heartbeat").Done()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

???

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event tracing through the logger is deprecated.


gs.heartbeatTicks++

Expand Down
6 changes: 1 addition & 5 deletions gossipsub_connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,9 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) {

// sybil squatters to be connected later
sybilHosts := getNetHosts(t, ctx, nSquatter)
squatters := make([]*sybilSquatter, 0, nSquatter)
for _, h := range sybilHosts {
squatter := &sybilSquatter{h: h}
h.SetStreamHandler(GossipSubID_v10, squatter.handleStream)
squatters = append(squatters, squatter)
}

// connect the honest hosts
Expand All @@ -97,14 +95,12 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) {

// subscribe everyone to the topic
topic := "test"
var msgs []*Subscription
for _, ps := range psubs {
subch, err := ps.Subscribe(topic)
_, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}

msgs = append(msgs, subch)
}

// sleep to allow meshes to form
Expand Down
8 changes: 1 addition & 7 deletions gossipsub_feat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,7 @@ func TestDefaultGossipSubFeatures(t *testing.T) {
func TestGossipSubCustomProtocols(t *testing.T) {
customsub := protocol.ID("customsub/1.0.0")
protos := []protocol.ID{customsub, FloodSubID}
features := func(feat GossipSubFeature, proto protocol.ID) bool {
if proto == customsub {
return true
}

return false
}
features := func(feat GossipSubFeature, proto protocol.ID) bool { return proto == customsub }

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
67 changes: 43 additions & 24 deletions gossipsub_spam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"fmt"
"math/rand"
"strconv"
"sync"
Expand All @@ -15,7 +16,6 @@ import (

pb "github.com/libp2p/go-libp2p-pubsub/pb"

logging "github.com/ipfs/go-log"
"github.com/libp2p/go-msgio/protoio"
)

Expand Down Expand Up @@ -188,6 +188,7 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {

newMockGS(ctx, t, attacker, func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
// When the legit host connects it will send us its subscriptions
errs := make(chan error)
for _, sub := range irpc.GetSubscriptions() {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the peer
Expand Down Expand Up @@ -217,14 +218,14 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
// per heartbeat
iwc := getIWantCount()
if iwc > GossipSubMaxIHaveLength {
t.Fatalf("Expecting max %d IWANTs per heartbeat but received %d", GossipSubMaxIHaveLength, iwc)
errs <- fmt.Errorf("Expecting max %d IWANTs per heartbeat but received %d", GossipSubMaxIHaveLength, iwc)
}
firstBatchCount := iwc

// the score should still be 0 because we haven't broken any promises yet
score := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
if score != 0 {
t.Fatalf("Expected 0 score, but got %f", score)
errs <- fmt.Errorf("Expected 0 score, but got %f", score)
}

// Send a bunch of IHAVEs
Expand All @@ -240,24 +241,29 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
// Should have sent more IWANTs after the heartbeat
iwc = getIWantCount()
if iwc == firstBatchCount {
t.Fatal("Expecting to receive more IWANTs after heartbeat but did not")
errs <- fmt.Errorf("Expecting to receive more IWANTs after heartbeat but did not")
}
// Should not be more than the maximum per heartbeat
if iwc-firstBatchCount > GossipSubMaxIHaveLength {
t.Fatalf("Expecting max %d IWANTs per heartbeat but received %d", GossipSubMaxIHaveLength, iwc-firstBatchCount)
errs <- fmt.Errorf("Expecting max %d IWANTs per heartbeat but received %d", GossipSubMaxIHaveLength, iwc-firstBatchCount)
}

time.Sleep(GossipSubIWantFollowupTime)

// The score should now be negative because of broken promises
score = ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
if score >= 0 {
t.Fatalf("Expected negative score, but got %f", score)
errs <- fmt.Errorf("Expected negative score, but got %f", score)
}
close(errs)
}()
}
}

for err := range errs {
t.Fatal(err)
}

// Record the count of received IWANT messages
if ctl := irpc.GetControl(); ctl != nil {
addIWantCount(len(ctl.GetIwant()))
Expand Down Expand Up @@ -405,6 +411,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {

newMockGS(ctx, t, attacker, func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
// When the legit host connects it will send us its subscriptions
errs := make(chan error)
for _, sub := range irpc.GetSubscriptions() {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the peer
Expand All @@ -424,7 +431,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
// No PRUNE should have been sent at this stage
pc := getPruneCount()
if pc != 0 {
t.Fatalf("Expected %d PRUNE messages but got %d", 0, pc)
errs <- fmt.Errorf("Expected %d PRUNE messages but got %d", 0, pc)
}

// Send a PRUNE to remove the attacker node from the legit
Expand All @@ -440,7 +447,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
// No PRUNE should have been sent at this stage
pc = getPruneCount()
if pc != 0 {
t.Fatalf("Expected %d PRUNE messages but got %d", 0, pc)
errs <- fmt.Errorf("Expected %d PRUNE messages but got %d", 0, pc)
}

// wait for the GossipSubGraftFloodThreshold to pass before attempting another graft
Expand All @@ -458,12 +465,12 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
// yet.
pc = getPruneCount()
if pc != 1 {
t.Fatalf("Expected %d PRUNE messages but got %d", 1, pc)
errs <- fmt.Errorf("Expected %d PRUNE messages but got %d", 1, pc)
}

score1 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
if score1 >= 0 {
t.Fatalf("Expected negative score, but got %f", score1)
errs <- fmt.Errorf("Expected negative score, but got %f", score1)
}

// Send a GRAFT again to attempt to rejoin the mesh
Expand All @@ -477,12 +484,12 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
// a PRUNE because we are before the flood threshold
pc = getPruneCount()
if pc != 2 {
t.Fatalf("Expected %d PRUNE messages but got %d", 2, pc)
errs <- fmt.Errorf("Expected %d PRUNE messages but got %d", 2, pc)
}

score2 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
if score2 >= score1 {
t.Fatalf("Expected score below %f, but got %f", score1, score2)
errs <- fmt.Errorf("Expected score below %f, but got %f", score1, score2)
}

// Send another GRAFT; this should get us a PRUNE, but penalize us below the graylist threshold
Expand All @@ -494,15 +501,18 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {

pc = getPruneCount()
if pc != 3 {
t.Fatalf("Expected %d PRUNE messages but got %d", 3, pc)
errs <- fmt.Errorf("Expected %d PRUNE messages but got %d", 3, pc)
close(errs)
}

score3 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
if score3 >= score2 {
t.Fatalf("Expected score below %f, but got %f", score2, score3)
errs <- fmt.Errorf("Expected score below %f, but got %f", score2, score3)
close(errs)
}
if score3 >= -1000 {
t.Fatalf("Expected score below %f, but got %f", -1000.0, score3)
errs <- fmt.Errorf("Expected score below %f, but got %f", -1000.0, score3)
close(errs)
}

// Wait for the PRUNE backoff to expire and try again; this time we should fail
Expand All @@ -518,7 +528,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {

pc = getPruneCount()
if pc != 3 {
t.Fatalf("Expected %d PRUNE messages but got %d", 3, pc)
errs <- fmt.Errorf("Expected %d PRUNE messages but got %d", 3, pc)
}

// make sure we are _not_ in the mesh
Expand All @@ -531,12 +541,16 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {

inMesh := <-res
if inMesh {
t.Fatal("Expected to not be in the mesh of the legitimate host")
errs <- fmt.Errorf("Expected to not be in the mesh of the legitimate host")
}
}()
}
}

for err := range errs {
t.Fatal(err)
}

if ctl := irpc.GetControl(); ctl != nil {
addPruneCount(len(ctl.GetPrune()))
}
Expand Down Expand Up @@ -642,6 +656,7 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {

newMockGS(ctx, t, attacker, func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
// When the legit host connects it will send us its subscriptions
errs := make(chan error)
for _, sub := range irpc.GetSubscriptions() {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the peer
Expand All @@ -655,7 +670,7 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {

// Attacker score should start at zero
if attackerScore() != 0 {
t.Fatalf("Expected attacker score to be zero but it's %f", attackerScore())
errs <- fmt.Errorf("Expected attacker score to be zero but it's %f", attackerScore())
}

// Send a bunch of messages with no signature (these will
Expand All @@ -677,21 +692,25 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {

// The attackers score should now have fallen below zero
if attackerScore() >= 0 {
t.Fatalf("Expected attacker score to be less than zero but it's %f", attackerScore())
errs <- fmt.Errorf("Expected attacker score to be less than zero but it's %f", attackerScore())
}
// There should be several rejected messages (because the signature was invalid)
if tracer.rejectCount == 0 {
t.Fatal("Expected message rejection but got none")
errs <- fmt.Errorf("Expected message rejection but got none")
}
// The legit node should have sent a PRUNE message
pc := getPruneCount()
if pc == 0 {
t.Fatal("Expected attacker node to be PRUNED when score drops low enough")
errs <- fmt.Errorf("Expected attacker node to be PRUNED when score drops low enough")
}
}()
}
}

for err := range errs {
t.Fatal(err)
}

if ctl := irpc.GetControl(); ctl != nil {
addPruneCount(len(ctl.GetPrune()))
}
Expand All @@ -702,9 +721,9 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {
<-ctx.Done()
}

func turnOnPubsubDebug() {
logging.SetLogLevel("pubsub", "debug")
}
// func turnOnPubsubDebug() {
// logging.SetLogLevel("pubsub", "debug")
// }

type mockGSOnRead func(writeMsg func(*pb.RPC), irpc *pb.RPC)

Expand Down