Skip to content

Commit

Permalink
p2p: improve PEX reactor (tendermint#6305)
Browse files Browse the repository at this point in the history
  • Loading branch information
cmwaters authored and tychoish committed May 5, 2021
1 parent 588c877 commit aea77db
Show file tree
Hide file tree
Showing 15 changed files with 2,109 additions and 205 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG_PENDING.md
Expand Up @@ -57,6 +57,9 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi

- [config] Add `--mode` flag and config variable. See [ADR-52](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-052-tendermint-mode.md) @dongsam
- [rpc] \#6329 Don't cap page size in unsafe mode (@gotjoshua, @cmwaters)
- [pex] \#6305 v2 pex reactor with backwards compatability. Introduces two new pex messages to
accomodate for the new p2p stack. Removes the notion of seeds and crawling. All peer
exchange reactors behave the same. (@cmwaters)
- [crypto] \#6376 Enable sr25519 as a validator key

### IMPROVEMENTS
Expand Down
5 changes: 4 additions & 1 deletion blockchain/v0/reactor_test.go
Expand Up @@ -300,7 +300,10 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) {
// XXX: This causes a potential race condition.
// See: https://github.com/tendermint/tendermint/issues/6005
otherGenDoc, otherPrivVals := randGenesisDoc(config, 1, false, 30)
newNode := rts.network.MakeNode(t)
newNode := rts.network.MakeNode(t, p2ptest.NodeOptions{
MaxPeers: uint16(len(rts.nodes) + 1),
MaxConnected: uint16(len(rts.nodes) + 1),
})
rts.addNode(t, newNode.NodeID, otherGenDoc, otherPrivVals[0], maxBlockHeight)

// add a fake peer just so we do not wait for the consensus ticker to timeout
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Expand Up @@ -646,7 +646,7 @@ func createPeerManager(
}

for _, peer := range peers {
if err := peerManager.Add(peer); err != nil {
if _, err := peerManager.Add(peer); err != nil {
return nil, fmt.Errorf("failed to add peer %q: %w", peer, err)
}
}
Expand Down
20 changes: 16 additions & 4 deletions p2p/p2ptest/network.go
Expand Up @@ -30,6 +30,12 @@ type Network struct {
type NetworkOptions struct {
NumNodes int
BufferSize int
NodeOpts NodeOptions
}

type NodeOptions struct {
MaxPeers uint16
MaxConnected uint16
}

func (opts *NetworkOptions) setDefaults() {
Expand All @@ -50,7 +56,7 @@ func MakeNetwork(t *testing.T, opts NetworkOptions) *Network {
}

for i := 0; i < opts.NumNodes; i++ {
node := network.MakeNode(t)
node := network.MakeNode(t, opts.NodeOpts)
network.Nodes[node.NodeID] = node
}

Expand Down Expand Up @@ -81,7 +87,9 @@ func (n *Network) Start(t *testing.T) {
for _, targetAddress := range dialQueue[i+1:] { // nodes <i already connected
targetNode := n.Nodes[targetAddress.NodeID]
targetSub := subs[targetAddress.NodeID]
require.NoError(t, sourceNode.PeerManager.Add(targetAddress))
added, err := sourceNode.PeerManager.Add(targetAddress)
require.NoError(t, err)
require.True(t, added)

select {
case peerUpdate := <-sourceSub.Updates():
Expand All @@ -107,7 +115,9 @@ func (n *Network) Start(t *testing.T) {

// Add the address to the target as well, so it's able to dial the
// source back if that's even necessary.
require.NoError(t, targetNode.PeerManager.Add(sourceAddress))
added, err = targetNode.PeerManager.Add(sourceAddress)
require.NoError(t, err)
require.True(t, added)
}
}
}
Expand Down Expand Up @@ -214,7 +224,7 @@ type Node struct {
// MakeNode creates a new Node configured for the network with a
// running peer manager, but does not add it to the existing
// network. Callers are responsible for updating peering relationships.
func (n *Network) MakeNode(t *testing.T) *Node {
func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node {
privKey := ed25519.GenPrivKey()
nodeID := p2p.NodeIDFromPubKey(privKey.PubKey())
nodeInfo := p2p.NodeInfo{
Expand All @@ -230,6 +240,8 @@ func (n *Network) MakeNode(t *testing.T) *Node {
MinRetryTime: 10 * time.Millisecond,
MaxRetryTime: 100 * time.Millisecond,
RetryTimeJitter: time.Millisecond,
MaxPeers: opts.MaxPeers,
MaxConnected: opts.MaxConnected,
})
require.NoError(t, err)

Expand Down
34 changes: 25 additions & 9 deletions p2p/peermanager.go
Expand Up @@ -384,13 +384,14 @@ func (m *PeerManager) prunePeers() error {
}

// Add adds a peer to the manager, given as an address. If the peer already
// exists, the address is added to it if not already present.
func (m *PeerManager) Add(address NodeAddress) error {
// exists, the address is added to it if it isn't already present. This will push
// low scoring peers out of the address book if it exceeds the maximum size.
func (m *PeerManager) Add(address NodeAddress) (bool, error) {
if err := address.Validate(); err != nil {
return err
return false, err
}
if address.NodeID == m.selfID {
return fmt.Errorf("can't add self (%v) to peer store", m.selfID)
return false, fmt.Errorf("can't add self (%v) to peer store", m.selfID)
}

m.mtx.Lock()
Expand All @@ -400,17 +401,32 @@ func (m *PeerManager) Add(address NodeAddress) error {
if !ok {
peer = m.newPeerInfo(address.NodeID)
}
if _, ok := peer.AddressInfo[address]; !ok {
peer.AddressInfo[address] = &peerAddressInfo{Address: address}
_, ok = peer.AddressInfo[address]
// if we already have the peer address, there's no need to continue
if ok {
return false, nil
}

// else add the new address
peer.AddressInfo[address] = &peerAddressInfo{Address: address}
if err := m.store.Set(peer); err != nil {
return err
return false, err
}
if err := m.prunePeers(); err != nil {
return err
return true, err
}
m.dialWaker.Wake()
return nil
return true, nil
}

// PeerRatio returns the ratio of peer addresses stored to the maximum size.
func (m *PeerManager) PeerRatio() float64 {
if m.options.MaxPeers == 0 {
return 0
}
m.mtx.Lock()
defer m.mtx.Unlock()
return float64(m.store.Size()) / float64(m.options.MaxPeers)
}

// DialNext finds an appropriate peer address to dial, and marks it as dialing.
Expand Down
4 changes: 3 additions & 1 deletion p2p/peermanager_scoring_test.go
Expand Up @@ -23,7 +23,9 @@ func TestPeerScoring(t *testing.T) {

// create a fake node
id := NodeID(strings.Repeat("a1", 20))
require.NoError(t, peerManager.Add(NodeAddress{NodeID: id, Protocol: "memory"}))
added, err := peerManager.Add(NodeAddress{NodeID: id, Protocol: "memory"})
require.NoError(t, err)
require.True(t, added)

t.Run("Synchronous", func(t *testing.T) {
// update the manager and make sure it's correct
Expand Down

0 comments on commit aea77db

Please sign in to comment.