Skip to content

Commit

Permalink
feat: broadcast size should not be capped for newly connected neighbo…
Browse files Browse the repository at this point in the history
…rs (#3367)
  • Loading branch information
istae committed Oct 12, 2022
1 parent 13a4704 commit 02c2f25
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 3 deletions.
18 changes: 15 additions & 3 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,12 +986,24 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) error {
var addrs []swarm.Address

depth := k.NeighborhoodDepth()
isNeighbor := swarm.Proximity(peer.Bytes(), k.base.Bytes()) >= depth

outer:
for bin := uint8(0); bin < swarm.MaxBins; bin++ {

connectedPeers, err := randomSubset(k.binReachablePeers(bin), broadcastBinSize)
if err != nil {
return err
var (
connectedPeers []swarm.Address
err error
)

if bin >= depth && isNeighbor {
connectedPeers = k.binReachablePeers(bin) // broadcast all neighborhood peers
} else {
connectedPeers, err = randomSubset(k.binReachablePeers(bin), broadcastBinSize)
if err != nil {
return err
}
}

for _, connectedPeer := range connectedPeers {
Expand Down
87 changes: 87 additions & 0 deletions pkg/topology/kademlia/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,93 @@ func TestAnnounceBgBroadcast(t *testing.T) {
}
}

func TestAnnounceNeighborhoodToNeighbor(t *testing.T) {
t.Parallel()

defer func(p int) {
*kademlia.SaturationPeers = p
}(*kademlia.SaturationPeers)
*kademlia.SaturationPeers = 4

defer func(p int) {
*kademlia.OverSaturationPeers = p
}(*kademlia.OverSaturationPeers)
*kademlia.OverSaturationPeers = 4

done := make(chan struct{})

mtx := sync.Mutex{}

var neighborAddr swarm.Address
const broadCastSize = 18

var (
conns int32
disc = mock.NewDiscovery(
mock.WithBroadcastPeers(func(ctx context.Context, p swarm.Address, addrs ...swarm.Address) error {
mtx.Lock()
defer mtx.Unlock()
if p.Equal(neighborAddr) {
if len(addrs) == broadCastSize {
close(done)
} else {
t.Fatal("broadcasted size did not match neighborhood size", "got", len(addrs), "want", broadCastSize)
}
}
return nil
}),
)
base, kad, ab, _, signer = newTestKademliaWithDiscovery(t, disc, &conns, nil, kademlia.Options{
ReachabilityFunc: func(swarm.Address) bool { return false },
})
)

neighborAddr = test.RandomAddressAt(base, 2)

if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}

// add some peers
for bin := 0; bin < 2; bin++ {
for i := 0; i < 4; i++ {
addr := test.RandomAddressAt(base, bin)
addOne(t, signer, kad, ab, addr)
waitCounter(t, &conns, 1)
}
}

waitPeers(t, kad, 8)
kDepth(t, kad, 1)

// add many more neighbors
for i := 0; i < 10; i++ {
addr := test.RandomAddressAt(base, 2)
addOne(t, signer, kad, ab, addr)
waitCounter(t, &conns, 1)
}

waitPeers(t, kad, broadCastSize)
kDepth(t, kad, 2)

// add one neighbor to track how many peers are broadcasted to it
addOne(t, signer, kad, ab, neighborAddr)

waitCounter(t, &conns, 1)
waitPeers(t, kad, broadCastSize+1)
kDepth(t, kad, 2)

if err := kad.Close(); err != nil {
t.Fatal(err)
}

select {
case <-done:
case <-time.After(time.Second):
t.Fatal("broadcast did not fire for broadcastTo peer")
}
}

func TestIteratorOpts(t *testing.T) {
var (
conns int32 // how many connect calls were made to the p2p mock
Expand Down

0 comments on commit 02c2f25

Please sign in to comment.