diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index c19ecca24a9d..7adb14366f71 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -57,6 +57,9 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi - [config] Add `--mode` flag and config variable. See [ADR-52](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-052-tendermint-mode.md) @dongsam - [rpc] \#6329 Don't cap page size in unsafe mode (@gotjoshua, @cmwaters) +- [pex] \#6305 v2 pex reactor with backwards compatability. Introduces two new pex messages to + accomodate for the new p2p stack. Removes the notion of seeds and crawling. All peer + exchange reactors behave the same. (@cmwaters) - [crypto] \#6376 Enable sr25519 as a validator key ### IMPROVEMENTS diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index 64a66a6a0bef..ffce2b3ccbec 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -300,7 +300,10 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) { // XXX: This causes a potential race condition. // See: https://github.com/tendermint/tendermint/issues/6005 otherGenDoc, otherPrivVals := randGenesisDoc(config, 1, false, 30) - newNode := rts.network.MakeNode(t) + newNode := rts.network.MakeNode(t, p2ptest.NodeOptions{ + MaxPeers: uint16(len(rts.nodes) + 1), + MaxConnected: uint16(len(rts.nodes) + 1), + }) rts.addNode(t, newNode.NodeID, otherGenDoc, otherPrivVals[0], maxBlockHeight) // add a fake peer just so we do not wait for the consensus ticker to timeout diff --git a/node/node.go b/node/node.go index 3c396a8b9178..fbf47d257d22 100644 --- a/node/node.go +++ b/node/node.go @@ -646,7 +646,7 @@ func createPeerManager( } for _, peer := range peers { - if err := peerManager.Add(peer); err != nil { + if _, err := peerManager.Add(peer); err != nil { return nil, fmt.Errorf("failed to add peer %q: %w", peer, err) } } diff --git a/p2p/p2ptest/network.go b/p2p/p2ptest/network.go index 47332a240705..a5a858f66eb7 100644 --- a/p2p/p2ptest/network.go +++ b/p2p/p2ptest/network.go @@ -30,6 +30,12 @@ type Network struct { type NetworkOptions struct { NumNodes int BufferSize int + NodeOpts NodeOptions +} + +type NodeOptions struct { + MaxPeers uint16 + MaxConnected uint16 } func (opts *NetworkOptions) setDefaults() { @@ -50,7 +56,7 @@ func MakeNetwork(t *testing.T, opts NetworkOptions) *Network { } for i := 0; i < opts.NumNodes; i++ { - node := network.MakeNode(t) + node := network.MakeNode(t, opts.NodeOpts) network.Nodes[node.NodeID] = node } @@ -81,7 +87,9 @@ func (n *Network) Start(t *testing.T) { for _, targetAddress := range dialQueue[i+1:] { // nodes int(maxAddresses) { + return fmt.Errorf("peer sent too many addresses (max: %d, got: %d)", + maxAddresses, + len(msg.Addresses), + ) + } + for _, pexAddress := range msg.Addresses { + // no protocol is prefixed so we assume the default (mconn) peerAddress, err := p2p.ParseNodeAddress( fmt.Sprintf("%s@%s:%d", pexAddress.ID, pexAddress.IP, pexAddress.Port)) if err != nil { - logger.Debug("invalid PEX address", "address", pexAddress, "err", err) continue } - if err = r.peerManager.Add(peerAddress); err != nil { - logger.Debug("failed to register PEX address", "address", peerAddress, "err", err) + added, err := r.peerManager.Add(peerAddress) + if err != nil { + logger.Error("failed to add PEX address", "address", peerAddress, "err", err) + } + if added { + r.newPeers++ + logger.Debug("added PEX address", "address", peerAddress) + } + r.totalPeers++ + } + + // V2 PEX MESSAGES + case *protop2p.PexRequestV2: + // check if the peer hasn't sent a prior request too close to this one + // in time + if err := r.markPeerRequest(envelope.From); err != nil { + return err + } + + // request peers from the peer manager and parse the NodeAddresses into + // URL strings + nodeAddresses := r.peerManager.Advertise(envelope.From, maxAddresses) + pexAddressesV2 := make([]protop2p.PexAddressV2, len(nodeAddresses)) + for idx, addr := range nodeAddresses { + pexAddressesV2[idx] = protop2p.PexAddressV2{ + URL: addr.String(), } } + r.pexCh.Out <- p2p.Envelope{ + To: envelope.From, + Message: &protop2p.PexResponseV2{Addresses: pexAddressesV2}, + } + + case *protop2p.PexResponseV2: + // check if the response matches a request that was made to that peer + if err := r.markPeerResponse(envelope.From); err != nil { + return err + } + + // check the size of the response + if len(msg.Addresses) > int(maxAddresses) { + return fmt.Errorf("peer sent too many addresses (max: %d, got: %d)", + maxAddresses, + len(msg.Addresses), + ) + } + + for _, pexAddress := range msg.Addresses { + peerAddress, err := p2p.ParseNodeAddress(pexAddress.URL) + if err != nil { + continue + } + added, err := r.peerManager.Add(peerAddress) + if err != nil { + logger.Error("failed to add V2 PEX address", "address", peerAddress, "err", err) + } + if added { + r.newPeers++ + logger.Debug("added V2 PEX address", "address", peerAddress) + } + r.totalPeers++ + } default: return fmt.Errorf("received unknown message: %T", msg) @@ -119,23 +301,31 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { // // FIXME: We may want to cache and parallelize this, but for now we'll just rely // on the operating system to cache it for us. -func (r *ReactorV2) resolve(addresses []p2p.NodeAddress, limit uint16) []protop2p.PexAddress { - pexAddresses := make([]protop2p.PexAddress, 0, len(addresses)) +func (r *ReactorV2) resolve(addresses []p2p.NodeAddress) []protop2p.PexAddress { + limit := len(addresses) + pexAddresses := make([]protop2p.PexAddress, 0, limit) for _, address := range addresses { ctx, cancel := context.WithTimeout(context.Background(), resolveTimeout) endpoints, err := address.Resolve(ctx) + r.Logger.Debug("resolved node address", "endpoints", endpoints) cancel() if err != nil { r.Logger.Debug("failed to resolve address", "address", address, "err", err) continue } for _, endpoint := range endpoints { - if len(pexAddresses) >= int(limit) { + r.Logger.Debug("checking endpint", "IP", endpoint.IP, "Port", endpoint.Port) + if len(pexAddresses) >= limit { return pexAddresses } else if endpoint.IP != nil { + r.Logger.Debug("appending pex address") // PEX currently only supports IP-networked transports (as // opposed to e.g. p2p.MemoryTransport). + // + // FIXME: as the PEX address contains no information about the + // protocol, we jam this into the ID. We won't need to this once + // we support URLs pexAddresses = append(pexAddresses, protop2p.PexAddress{ ID: string(address.NodeID), IP: endpoint.IP.String(), @@ -157,7 +347,7 @@ func (r *ReactorV2) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (er } }() - r.Logger.Debug("received message", "peer", envelope.From) + r.Logger.Debug("received PEX message", "peer", envelope.From) switch chID { case p2p.ChannelID(PexChannel): @@ -170,56 +360,155 @@ func (r *ReactorV2) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (er return err } -// processPexCh implements a blocking event loop where we listen for p2p -// Envelope messages from the pexCh. -func (r *ReactorV2) processPexCh() { - defer r.pexCh.Close() +// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we +// send a request for addresses. +func (r *ReactorV2) processPeerUpdate(peerUpdate p2p.PeerUpdate) { + r.Logger.Debug("received PEX peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) + switch peerUpdate.Status { + case p2p.PeerStatusUp: + r.availablePeers.PushBack(peerUpdate.NodeID) + case p2p.PeerStatusDown: + r.removePeer(peerUpdate.NodeID) + default: + } +} - for { - select { - case envelope := <-r.pexCh.In: - if err := r.handleMessage(r.pexCh.ID, envelope); err != nil { - r.Logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err) - r.pexCh.Error <- p2p.PeerError{ - NodeID: envelope.From, - Err: err, - } - } +func (r *ReactorV2) waitUntilNextRequest() <-chan time.Time { + return time.After(time.Until(r.nextRequestTime)) +} - case <-r.closeCh: - r.Logger.Debug("stopped listening on PEX channel; closing...") - return +// sendRequestForPeers pops the first peerID off the list and sends the +// peer a request for more peer addresses. The function then moves the +// peer into the requestsSent bucket and calculates when the next request +// time should be +func (r *ReactorV2) sendRequestForPeers() { + peer := r.availablePeers.Front() + if peer == nil { + // no peers are available + r.Logger.Debug("no available peers to send request to, waiting...") + r.nextRequestTime = time.Now().Add(noAvailablePeersWaitPeriod) + return + } + peerID := peer.Value.(p2p.NodeID) + + // The node accommodates for both pex systems + if r.isLegacyPeer(peerID) { + r.pexCh.Out <- p2p.Envelope{ + To: peerID, + Message: &protop2p.PexRequest{}, + } + } else { + r.pexCh.Out <- p2p.Envelope{ + To: peerID, + Message: &protop2p.PexRequestV2{}, } } + + // remove the peer from the available peers list and mark it in the requestsSent map + r.availablePeers.Remove(peer) + peer.DetachPrev() + r.mtx.Lock() + r.requestsSent[peerID] = struct{}{} + r.mtx.Unlock() + + r.calculateNextRequestTime() + r.Logger.Debug("peer request sent", "next_request_time", r.nextRequestTime) } -// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we -// send a request for addresses. -func (r *ReactorV2) processPeerUpdate(peerUpdate p2p.PeerUpdate) { - r.Logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) +// calculateNextRequestTime implements something of a proportional controller +// to estimate how often the reactor should be requesting new peer addresses. +// The dependent variable in this calculation is the ratio of new peers to +// all peers that the reactor receives. The interval is thus calculated as the +// inverse squared. In the beginning, all peers should be new peers. +// We expect this ratio to be near 1 and thus the interval to be as short +// as possible. As the node becomes more familiar with the network the ratio of +// new nodes will plummet to a very small number, meaning the interval expands +// to its upper bound. +// CONTRACT: Must use a write lock as nextRequestTime is updated +func (r *ReactorV2) calculateNextRequestTime() { + // check if the peer store is full. If so then there is no need + // to send peer requests too often + if ratio := r.peerManager.PeerRatio(); ratio >= 0.95 { + r.Logger.Debug("peer manager near full ratio, sleeping...", + "sleep_period", fullCapacityInterval, "ratio", ratio) + r.nextRequestTime = time.Now().Add(fullCapacityInterval) + return + } - if peerUpdate.Status == p2p.PeerStatusUp { - r.pexCh.Out <- p2p.Envelope{ - To: peerUpdate.NodeID, - Message: &protop2p.PexRequest{}, + // baseTime represents the shortest interval that we can send peer requests + // in. For example if we have 10 peers and we can't send a message to the + // same peer every 500ms, then we can send a request every 50ms. In practice + // we use a safety margin of 2, ergo 100ms + peers := tmmath.MinInt(r.availablePeers.Len(), 50) + baseTime := minReceiveRequestInterval + if peers > 0 { + baseTime = minReceiveRequestInterval * 2 / time.Duration(peers) + } + + if r.totalPeers > 0 || r.discoveryRatio == 0 { + // find the ratio of new peers. NOTE: We add 1 to both sides to avoid + // divide by zero problems + ratio := float32(r.totalPeers+1) / float32(r.newPeers+1) + // square the ratio in order to get non linear time intervals + // NOTE: The longest possible interval for a network with 100 or more peers + // where a node is connected to 50 of them is 2 minutes. + r.discoveryRatio = ratio * ratio + r.newPeers = 0 + r.totalPeers = 0 + } + // NOTE: As ratio is always >= 1, discovery ratio is >= 1. Therefore we don't need to worry + // about the next request time being less than the minimum time + r.nextRequestTime = time.Now().Add(baseTime * time.Duration(r.discoveryRatio)) +} + +func (r *ReactorV2) removePeer(id p2p.NodeID) { + for e := r.availablePeers.Front(); e != nil; e = e.Next() { + if e.Value == id { + r.availablePeers.Remove(e) + e.DetachPrev() + break } } + r.mtx.Lock() + defer r.mtx.Unlock() + delete(r.requestsSent, id) + delete(r.lastReceivedRequests, id) } -// processPeerUpdates initiates a blocking process where we listen for and handle -// PeerUpdate messages. When the reactor is stopped, we will catch the signal and -// close the p2p PeerUpdatesCh gracefully. -func (r *ReactorV2) processPeerUpdates() { - defer r.peerUpdates.Close() +func (r *ReactorV2) markPeerRequest(peer p2p.NodeID) error { + r.mtx.Lock() + defer r.mtx.Unlock() + if lastRequestTime, ok := r.lastReceivedRequests[peer]; ok { + if time.Now().Before(lastRequestTime.Add(minReceiveRequestInterval)) { + return fmt.Errorf("peer sent a request too close after a prior one. Minimum interval: %v", + minReceiveRequestInterval) + } + } + r.lastReceivedRequests[peer] = time.Now() + return nil +} - for { - select { - case peerUpdate := <-r.peerUpdates.Updates(): - r.processPeerUpdate(peerUpdate) +func (r *ReactorV2) markPeerResponse(peer p2p.NodeID) error { + r.mtx.Lock() + defer r.mtx.Unlock() + // check if a request to this peer was sent + if _, ok := r.requestsSent[peer]; !ok { + return fmt.Errorf("peer sent a PEX response when none was requested (%v)", peer) + } + delete(r.requestsSent, peer) + // attach to the back of the list so that the peer can be used again for + // future requests + r.availablePeers.PushBack(peer) + return nil +} - case <-r.closeCh: - r.Logger.Debug("stopped listening on peer updates channel; closing...") - return +// all addresses must use a MCONN protocol for the peer to be considered part of the +// legacy p2p pex system +func (r *ReactorV2) isLegacyPeer(peer p2p.NodeID) bool { + for _, addr := range r.peerManager.Addresses(peer) { + if addr.Protocol != p2p.MConnProtocol { + return false } } + return true } diff --git a/p2p/pex/reactor_test.go b/p2p/pex/reactor_test.go new file mode 100644 index 000000000000..6db8c02337bd --- /dev/null +++ b/p2p/pex/reactor_test.go @@ -0,0 +1,722 @@ +package pex_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/p2ptest" + "github.com/tendermint/tendermint/p2p/pex" + proto "github.com/tendermint/tendermint/proto/tendermint/p2p" +) + +const ( + checkFrequency = 500 * time.Millisecond + defaultBufferSize = 2 + shortWait = 10 * time.Second + longWait = 60 * time.Second + + firstNode = 0 + secondNode = 1 + thirdNode = 2 + fourthNode = 3 +) + +func TestReactorBasic(t *testing.T) { + // start a network with one mock reactor and one "real" reactor + testNet := setup(t, testOptions{ + MockNodes: 1, + TotalNodes: 2, + }) + testNet.connectAll(t) + testNet.start(t) + + // assert that the mock node receives a request from the real node + testNet.listenForRequest(t, secondNode, firstNode, shortWait) + + // assert that when a mock node sends a request it receives a response (and + // the correct one) + testNet.sendRequest(t, firstNode, secondNode, true) + testNet.listenForResponse(t, secondNode, firstNode, shortWait, []proto.PexAddressV2(nil)) +} + +func TestReactorConnectFullNetwork(t *testing.T) { + testNet := setup(t, testOptions{ + TotalNodes: 8, + }) + + // make every node be only connected with one other node (it actually ends up + // being two because of two way connections but oh well) + testNet.connectN(t, 1) + testNet.start(t) + + // assert that all nodes add each other in the network + for idx := 0; idx < len(testNet.nodes); idx++ { + testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1, longWait) + } +} + +func TestReactorSendsRequestsTooOften(t *testing.T) { + testNet := setup(t, testOptions{ + MockNodes: 1, + TotalNodes: 3, + }) + testNet.connectAll(t) + testNet.start(t) + + // firstNode sends two requests to the secondNode + testNet.sendRequest(t, firstNode, secondNode, true) + testNet.sendRequest(t, firstNode, secondNode, true) + + // assert that the secondNode evicts the first node (although they reconnect + // straight away again) + testNet.listenForPeerUpdate(t, secondNode, firstNode, p2p.PeerStatusDown, shortWait) + + // firstNode should still receive the address of the thirdNode by the secondNode + expectedAddrs := testNet.getV2AddressesFor([]int{thirdNode}) + testNet.listenForResponse(t, secondNode, firstNode, shortWait, expectedAddrs) + +} + +func TestReactorSendsResponseWithoutRequest(t *testing.T) { + testNet := setup(t, testOptions{ + MockNodes: 1, + TotalNodes: 3, + }) + testNet.connectAll(t) + testNet.start(t) + + // firstNode sends the secondNode an unrequested response + // NOTE: secondNode will send a request by default during startup so we send + // two responses to counter that. + testNet.sendResponse(t, firstNode, secondNode, []int{thirdNode}, true) + testNet.sendResponse(t, firstNode, secondNode, []int{thirdNode}, true) + + // secondNode should evict the firstNode + testNet.listenForPeerUpdate(t, secondNode, firstNode, p2p.PeerStatusDown, shortWait) +} + +func TestReactorNeverSendsTooManyPeers(t *testing.T) { + testNet := setup(t, testOptions{ + MockNodes: 1, + TotalNodes: 2, + }) + testNet.connectAll(t) + testNet.start(t) + + testNet.addNodes(t, 110) + nodes := make([]int, 110) + for i := 0; i < len(nodes); i++ { + nodes[i] = i + 2 + } + testNet.addAddresses(t, secondNode, nodes) + + // first we check that even although we have 110 peers, honest pex reactors + // only send 100 (test if secondNode sends firstNode 100 addresses) + testNet.pingAndlistenForNAddresses(t, secondNode, firstNode, shortWait, 100) +} + +func TestReactorErrorsOnReceivingTooManyPeers(t *testing.T) { + testNet := setup(t, testOptions{ + MockNodes: 1, + TotalNodes: 2, + }) + testNet.connectAll(t) + testNet.start(t) + + testNet.addNodes(t, 110) + nodes := make([]int, 110) + for i := 0; i < len(nodes); i++ { + nodes[i] = i + 2 + } + + // now we send a response with more than 100 peers + testNet.sendResponse(t, firstNode, secondNode, nodes, true) + // secondNode should evict the firstNode + testNet.listenForPeerUpdate(t, secondNode, firstNode, p2p.PeerStatusDown, shortWait) +} + +func TestReactorSmallPeerStoreInALargeNetwork(t *testing.T) { + testNet := setup(t, testOptions{ + TotalNodes: 16, + MaxPeers: 8, + MaxConnected: 6, + BufferSize: 8, + }) + testNet.connectN(t, 1) + testNet.start(t) + + // test that all nodes reach full capacity + for _, nodeID := range testNet.nodes { + require.Eventually(t, func() bool { + // nolint:scopelint + return testNet.network.Nodes[nodeID].PeerManager.PeerRatio() >= 0.9 + }, longWait, checkFrequency) + } +} + +func TestReactorLargePeerStoreInASmallNetwork(t *testing.T) { + testNet := setup(t, testOptions{ + TotalNodes: 10, + MaxPeers: 100, + MaxConnected: 100, + BufferSize: 10, + }) + testNet.connectN(t, 1) + testNet.start(t) + + // assert that all nodes add each other in the network + for idx := 0; idx < len(testNet.nodes); idx++ { + testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1, longWait) + } +} + +func TestReactorWithNetworkGrowth(t *testing.T) { + testNet := setup(t, testOptions{ + TotalNodes: 5, + BufferSize: 5, + }) + testNet.connectAll(t) + testNet.start(t) + + // assert that all nodes add each other in the network + for idx := 0; idx < len(testNet.nodes); idx++ { + testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1, shortWait) + } + + // now we inject 10 more nodes + testNet.addNodes(t, 10) + for i := 5; i < testNet.total; i++ { + node := testNet.nodes[i] + require.NoError(t, testNet.reactors[node].Start()) + require.True(t, testNet.reactors[node].IsRunning()) + // we connect all new nodes to a single entry point and check that the + // node can distribute the addresses to all the others + testNet.connectPeers(t, 0, i) + } + require.Len(t, testNet.reactors, 15) + + // assert that all nodes add each other in the network + for idx := 0; idx < len(testNet.nodes); idx++ { + testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1, longWait) + } +} + +func TestReactorIntegrationWithLegacyHandleRequest(t *testing.T) { + testNet := setup(t, testOptions{ + MockNodes: 1, + TotalNodes: 3, + }) + testNet.connectAll(t) + testNet.start(t) + t.Log(testNet.nodes) + + // mock node sends a V1 Pex message to the second node + testNet.sendRequest(t, firstNode, secondNode, false) + addrs := testNet.getAddressesFor(t, []int{thirdNode}) + testNet.listenForLegacyResponse(t, secondNode, firstNode, shortWait, addrs) +} + +func TestReactorIntegrationWithLegacyHandleResponse(t *testing.T) { + testNet := setup(t, testOptions{ + MockNodes: 1, + TotalNodes: 4, + BufferSize: 4, + }) + testNet.connectPeers(t, firstNode, secondNode) + testNet.connectPeers(t, firstNode, thirdNode) + testNet.connectPeers(t, firstNode, fourthNode) + testNet.start(t) + + testNet.listenForRequest(t, secondNode, firstNode, shortWait) + // send a v1 response instead + testNet.sendResponse(t, firstNode, secondNode, []int{thirdNode, fourthNode}, false) + testNet.requireNumberOfPeers(t, secondNode, len(testNet.nodes)-1, shortWait) +} + +type reactorTestSuite struct { + network *p2ptest.Network + logger log.Logger + + reactors map[p2p.NodeID]*pex.ReactorV2 + pexChannels map[p2p.NodeID]*p2p.Channel + + peerChans map[p2p.NodeID]chan p2p.PeerUpdate + peerUpdates map[p2p.NodeID]*p2p.PeerUpdates + + nodes []p2p.NodeID + mocks []p2p.NodeID + total int + opts testOptions +} + +type testOptions struct { + MockNodes int + TotalNodes int + BufferSize int + MaxPeers uint16 + MaxConnected uint16 +} + +// setup setups a test suite with a network of nodes. Mocknodes represent the +// hollow nodes that the test can listen and send on +func setup(t *testing.T, opts testOptions) *reactorTestSuite { + t.Helper() + + require.Greater(t, opts.TotalNodes, opts.MockNodes) + if opts.BufferSize == 0 { + opts.BufferSize = defaultBufferSize + } + networkOpts := p2ptest.NetworkOptions{ + NumNodes: opts.TotalNodes, + BufferSize: opts.BufferSize, + NodeOpts: p2ptest.NodeOptions{ + MaxPeers: opts.MaxPeers, + MaxConnected: opts.MaxConnected, + }, + } + chBuf := opts.BufferSize + realNodes := opts.TotalNodes - opts.MockNodes + + rts := &reactorTestSuite{ + logger: log.TestingLogger().With("testCase", t.Name()), + network: p2ptest.MakeNetwork(t, networkOpts), + reactors: make(map[p2p.NodeID]*pex.ReactorV2, realNodes), + pexChannels: make(map[p2p.NodeID]*p2p.Channel, opts.TotalNodes), + peerChans: make(map[p2p.NodeID]chan p2p.PeerUpdate, opts.TotalNodes), + peerUpdates: make(map[p2p.NodeID]*p2p.PeerUpdates, opts.TotalNodes), + total: opts.TotalNodes, + opts: opts, + } + + // NOTE: we don't assert that the channels get drained after stopping the + // reactor + rts.pexChannels = rts.network.MakeChannelsNoCleanup( + t, p2p.ChannelID(pex.PexChannel), new(proto.PexMessage), chBuf, + ) + + idx := 0 + for nodeID := range rts.network.Nodes { + rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf) + rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf) + rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID]) + + // the first nodes in the array are always mock nodes + if idx < opts.MockNodes { + rts.mocks = append(rts.mocks, nodeID) + } else { + rts.reactors[nodeID] = pex.NewReactorV2( + rts.logger.With("nodeID", nodeID), + rts.network.Nodes[nodeID].PeerManager, + rts.pexChannels[nodeID], + rts.peerUpdates[nodeID], + ) + } + rts.nodes = append(rts.nodes, nodeID) + + idx++ + } + + require.Len(t, rts.reactors, realNodes) + + t.Cleanup(func() { + for nodeID, reactor := range rts.reactors { + if reactor.IsRunning() { + require.NoError(t, reactor.Stop()) + require.False(t, reactor.IsRunning()) + } + rts.pexChannels[nodeID].Close() + rts.peerUpdates[nodeID].Close() + } + for _, nodeID := range rts.mocks { + rts.pexChannels[nodeID].Close() + rts.peerUpdates[nodeID].Close() + } + }) + + return rts +} + +// starts up the pex reactors for each node +func (r *reactorTestSuite) start(t *testing.T) { + t.Helper() + + for _, reactor := range r.reactors { + require.NoError(t, reactor.Start()) + require.True(t, reactor.IsRunning()) + } +} + +func (r *reactorTestSuite) addNodes(t *testing.T, nodes int) { + t.Helper() + + for i := 0; i < nodes; i++ { + node := r.network.MakeNode(t, p2ptest.NodeOptions{ + MaxPeers: r.opts.MaxPeers, + MaxConnected: r.opts.MaxConnected, + }) + r.network.Nodes[node.NodeID] = node + nodeID := node.NodeID + r.pexChannels[nodeID] = node.MakeChannelNoCleanup( + t, p2p.ChannelID(pex.PexChannel), new(proto.PexMessage), r.opts.BufferSize, + ) + r.peerChans[nodeID] = make(chan p2p.PeerUpdate, r.opts.BufferSize) + r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize) + r.network.Nodes[nodeID].PeerManager.Register(r.peerUpdates[nodeID]) + r.reactors[nodeID] = pex.NewReactorV2( + r.logger.With("nodeID", nodeID), + r.network.Nodes[nodeID].PeerManager, + r.pexChannels[nodeID], + r.peerUpdates[nodeID], + ) + r.nodes = append(r.nodes, nodeID) + r.total++ + } +} + +func (r *reactorTestSuite) listenFor( + t *testing.T, + node p2p.NodeID, + conditional func(msg p2p.Envelope) bool, + assertion func(t *testing.T, msg p2p.Envelope) bool, + waitPeriod time.Duration, +) { + timesUp := time.After(waitPeriod) + for { + select { + case envelope := <-r.pexChannels[node].In: + if conditional(envelope) && assertion(t, envelope) { + return + } + case <-timesUp: + require.Fail(t, "timed out waiting for message", + "node=%v, waitPeriod=%s", node, waitPeriod) + } + } +} + +func (r *reactorTestSuite) listenForRequest(t *testing.T, fromNode, toNode int, waitPeriod time.Duration) { + r.logger.Info("Listening for request", "from", fromNode, "to", toNode) + to, from := r.checkNodePair(t, toNode, fromNode) + conditional := func(msg p2p.Envelope) bool { + _, ok := msg.Message.(*proto.PexRequestV2) + return ok && msg.From == from + } + assertion := func(t *testing.T, msg p2p.Envelope) bool { + require.Equal(t, &proto.PexRequestV2{}, msg.Message) + return true + } + r.listenFor(t, to, conditional, assertion, waitPeriod) +} + +func (r *reactorTestSuite) pingAndlistenForNAddresses( + t *testing.T, + fromNode, toNode int, + waitPeriod time.Duration, + addresses int, +) { + r.logger.Info("Listening for addresses", "from", fromNode, "to", toNode) + to, from := r.checkNodePair(t, toNode, fromNode) + conditional := func(msg p2p.Envelope) bool { + _, ok := msg.Message.(*proto.PexResponseV2) + return ok && msg.From == from + } + assertion := func(t *testing.T, msg p2p.Envelope) bool { + m, ok := msg.Message.(*proto.PexResponseV2) + if !ok { + require.Fail(t, "expected pex response v2") + return true + } + // assert the same amount of addresses + if len(m.Addresses) == addresses { + return true + } + // if we didn't get the right length, we wait and send the + // request again + time.Sleep(300 * time.Millisecond) + r.sendRequest(t, toNode, fromNode, true) + return false + } + r.sendRequest(t, toNode, fromNode, true) + r.listenFor(t, to, conditional, assertion, waitPeriod) +} + +func (r *reactorTestSuite) listenForResponse( + t *testing.T, + fromNode, toNode int, + waitPeriod time.Duration, + addresses []proto.PexAddressV2, +) { + r.logger.Info("Listening for response", "from", fromNode, "to", toNode) + to, from := r.checkNodePair(t, toNode, fromNode) + conditional := func(msg p2p.Envelope) bool { + _, ok := msg.Message.(*proto.PexResponseV2) + r.logger.Info("message", msg, "ok", ok) + return ok && msg.From == from + } + assertion := func(t *testing.T, msg p2p.Envelope) bool { + require.Equal(t, &proto.PexResponseV2{Addresses: addresses}, msg.Message) + return true + } + r.listenFor(t, to, conditional, assertion, waitPeriod) +} + +func (r *reactorTestSuite) listenForLegacyResponse( + t *testing.T, + fromNode, toNode int, + waitPeriod time.Duration, + addresses []proto.PexAddress, +) { + r.logger.Info("Listening for response", "from", fromNode, "to", toNode) + to, from := r.checkNodePair(t, toNode, fromNode) + conditional := func(msg p2p.Envelope) bool { + _, ok := msg.Message.(*proto.PexResponse) + return ok && msg.From == from + } + assertion := func(t *testing.T, msg p2p.Envelope) bool { + require.Equal(t, &proto.PexResponse{Addresses: addresses}, msg.Message) + return true + } + r.listenFor(t, to, conditional, assertion, waitPeriod) +} + +func (r *reactorTestSuite) listenForPeerUpdate( + t *testing.T, + onNode, withNode int, + status p2p.PeerStatus, + waitPeriod time.Duration, +) { + on, with := r.checkNodePair(t, onNode, withNode) + sub := r.network.Nodes[on].PeerManager.Subscribe() + defer sub.Close() + timesUp := time.After(waitPeriod) + for { + select { + case peerUpdate := <-sub.Updates(): + if peerUpdate.NodeID == with { + require.Equal(t, status, peerUpdate.Status) + return + } + + case <-timesUp: + require.Fail(t, "timed out waiting for peer status", "%v with status %v", + with, status) + return + } + } +} + +func (r *reactorTestSuite) getV2AddressesFor(nodes []int) []proto.PexAddressV2 { + addresses := make([]proto.PexAddressV2, len(nodes)) + for idx, node := range nodes { + nodeID := r.nodes[node] + addresses[idx] = proto.PexAddressV2{ + URL: r.network.Nodes[nodeID].NodeAddress.String(), + } + } + return addresses +} + +func (r *reactorTestSuite) getAddressesFor(t *testing.T, nodes []int) []proto.PexAddress { + addresses := make([]proto.PexAddress, len(nodes)) + for idx, node := range nodes { + nodeID := r.nodes[node] + nodeAddrs := r.network.Nodes[nodeID].NodeAddress + endpoints, err := nodeAddrs.Resolve(context.Background()) + require.NoError(t, err) + require.Len(t, endpoints, 1) + addresses[idx] = proto.PexAddress{ + ID: string(nodeAddrs.NodeID), + IP: endpoints[0].IP.String(), + Port: uint32(endpoints[0].Port), + } + } + return addresses +} + +func (r *reactorTestSuite) sendRequest(t *testing.T, fromNode, toNode int, v2 bool) { + to, from := r.checkNodePair(t, toNode, fromNode) + if v2 { + r.pexChannels[from].Out <- p2p.Envelope{ + To: to, + Message: &proto.PexRequestV2{}, + } + } else { + r.pexChannels[from].Out <- p2p.Envelope{ + To: to, + Message: &proto.PexRequest{}, + } + } +} + +func (r *reactorTestSuite) sendResponse( + t *testing.T, + fromNode, toNode int, + withNodes []int, + v2 bool, +) { + from, to := r.checkNodePair(t, fromNode, toNode) + if v2 { + addrs := r.getV2AddressesFor(withNodes) + r.pexChannels[from].Out <- p2p.Envelope{ + To: to, + Message: &proto.PexResponseV2{ + Addresses: addrs, + }, + } + } else { + addrs := r.getAddressesFor(t, withNodes) + r.pexChannels[from].Out <- p2p.Envelope{ + To: to, + Message: &proto.PexResponse{ + Addresses: addrs, + }, + } + } +} + +func (r *reactorTestSuite) requireNumberOfPeers( + t *testing.T, + nodeIndex, numPeers int, + waitPeriod time.Duration, +) { + require.Eventuallyf(t, func() bool { + actualNumPeers := len(r.network.Nodes[r.nodes[nodeIndex]].PeerManager.Peers()) + return actualNumPeers >= numPeers + }, waitPeriod, checkFrequency, "peer failed to connect with the asserted amount of peers "+ + "index=%d, node=%q, waitPeriod=%s expected=%d actual=%d", + nodeIndex, r.nodes[nodeIndex], waitPeriod, numPeers, + len(r.network.Nodes[r.nodes[nodeIndex]].PeerManager.Peers()), + ) +} + +func (r *reactorTestSuite) connectAll(t *testing.T) { + r.connectN(t, r.total-1) +} + +// connects all nodes with n other nodes +func (r *reactorTestSuite) connectN(t *testing.T, n int) { + if n >= r.total { + require.Fail(t, "connectN: n must be less than the size of the network - 1") + } + + for i := 0; i < r.total; i++ { + for j := 0; j < n; j++ { + r.connectPeers(t, i, (i+j+1)%r.total) + } + } +} + +// connects node1 to node2 +func (r *reactorTestSuite) connectPeers(t *testing.T, sourceNode, targetNode int) { + t.Helper() + node1, node2 := r.checkNodePair(t, sourceNode, targetNode) + r.logger.Info("connecting peers", "sourceNode", sourceNode, "targetNode", targetNode) + + n1 := r.network.Nodes[node1] + if n1 == nil { + require.Fail(t, "connectPeers: source node %v is not part of the testnet", node1) + return + } + + n2 := r.network.Nodes[node2] + if n2 == nil { + require.Fail(t, "connectPeers: target node %v is not part of the testnet", node2) + return + } + + sourceSub := n1.PeerManager.Subscribe() + defer sourceSub.Close() + targetSub := n2.PeerManager.Subscribe() + defer targetSub.Close() + + sourceAddress := n1.NodeAddress + r.logger.Debug("source address", "address", sourceAddress) + targetAddress := n2.NodeAddress + r.logger.Debug("target address", "address", targetAddress) + + added, err := n1.PeerManager.Add(targetAddress) + require.NoError(t, err) + + if !added { + r.logger.Debug("nodes already know about one another", + "sourceNode", sourceNode, "targetNode", targetNode) + return + } + + select { + case peerUpdate := <-targetSub.Updates(): + require.Equal(t, p2p.PeerUpdate{ + NodeID: node1, + Status: p2p.PeerStatusUp, + }, peerUpdate) + r.logger.Debug("target connected with source") + case <-time.After(time.Second): + require.Fail(t, "timed out waiting for peer", "%v accepting %v", + targetNode, sourceNode) + } + + select { + case peerUpdate := <-sourceSub.Updates(): + require.Equal(t, p2p.PeerUpdate{ + NodeID: node2, + Status: p2p.PeerStatusUp, + }, peerUpdate) + r.logger.Debug("source connected with target") + case <-time.After(time.Second): + require.Fail(t, "timed out waiting for peer", "%v dialing %v", + sourceNode, targetNode) + } + + added, err = n2.PeerManager.Add(sourceAddress) + require.NoError(t, err) + require.True(t, added) +} + +// nolint: unused +func (r *reactorTestSuite) pexAddresses(t *testing.T, nodeIndices []int) []proto.PexAddress { + var addresses []proto.PexAddress + for _, i := range nodeIndices { + if i < len(r.nodes) { + require.Fail(t, "index for pex address is greater than number of nodes") + } + nodeAddrs := r.network.Nodes[r.nodes[i]].NodeAddress + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + endpoints, err := nodeAddrs.Resolve(ctx) + cancel() + require.NoError(t, err) + for _, endpoint := range endpoints { + if endpoint.IP != nil { + addresses = append(addresses, proto.PexAddress{ + ID: string(nodeAddrs.NodeID), + IP: endpoint.IP.String(), + Port: uint32(endpoint.Port), + }) + } + } + + } + return addresses +} + +func (r *reactorTestSuite) checkNodePair(t *testing.T, first, second int) (p2p.NodeID, p2p.NodeID) { + require.NotEqual(t, first, second) + require.Less(t, first, r.total) + require.Less(t, second, r.total) + return r.nodes[first], r.nodes[second] +} + +func (r *reactorTestSuite) addAddresses(t *testing.T, node int, addrs []int) { + peerManager := r.network.Nodes[r.nodes[node]].PeerManager + for _, addr := range addrs { + require.Less(t, addr, r.total) + address := r.network.Nodes[r.nodes[addr]].NodeAddress + added, err := peerManager.Add(address) + require.NoError(t, err) + require.True(t, added) + } +} diff --git a/p2p/router_test.go b/p2p/router_test.go index 6e6acc828059..91f2474a852b 100644 --- a/p2p/router_test.go +++ b/p2p/router_test.go @@ -573,7 +573,9 @@ func TestRouter_DialPeers(t *testing.T) { require.NoError(t, err) defer peerManager.Close() - require.NoError(t, peerManager.Add(address)) + added, err := peerManager.Add(address) + require.NoError(t, err) + require.True(t, added) sub := peerManager.Subscribe() defer sub.Close() @@ -648,9 +650,17 @@ func TestRouter_DialPeers_Parallel(t *testing.T) { require.NoError(t, err) defer peerManager.Close() - require.NoError(t, peerManager.Add(a)) - require.NoError(t, peerManager.Add(b)) - require.NoError(t, peerManager.Add(c)) + added, err := peerManager.Add(a) + require.NoError(t, err) + require.True(t, added) + + added, err = peerManager.Add(b) + require.NoError(t, err) + require.True(t, added) + + added, err = peerManager.Add(c) + require.NoError(t, err) + require.True(t, added) router, err := p2p.NewRouter( log.TestingLogger(), diff --git a/p2p/transport_memory.go b/p2p/transport_memory.go index f1f04ac8574f..31d03a2bb803 100644 --- a/p2p/transport_memory.go +++ b/p2p/transport_memory.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "net" "sync" "github.com/tendermint/tendermint/crypto" @@ -130,6 +131,10 @@ func (t *MemoryTransport) Endpoints() []Endpoint { return []Endpoint{{ Protocol: MemoryProtocol, Path: string(t.nodeID), + // An arbitrary IP and port is used in order for the pex + // reactor to be able to send addresses to one another. + IP: net.IPv4zero, + Port: 0, }} } } @@ -153,6 +158,10 @@ func (t *MemoryTransport) Dial(ctx context.Context, endpoint Endpoint) (Connecti if endpoint.Path == "" { return nil, errors.New("no path") } + if err := endpoint.Validate(); err != nil { + return nil, err + } + nodeID, err := NewNodeID(endpoint.Path) if err != nil { return nil, err diff --git a/p2p/transport_test.go b/p2p/transport_test.go index 657cb4b356a5..ecc2f5536fe9 100644 --- a/p2p/transport_test.go +++ b/p2p/transport_test.go @@ -113,7 +113,7 @@ func TestTransport_DialEndpoints(t *testing.T) { require.Error(t, err) // Tests for networked endpoints (with IP). - if len(endpoint.IP) > 0 { + if len(endpoint.IP) > 0 && endpoint.Protocol != p2p.MemoryProtocol { for _, tc := range ipTestCases { tc := tc t.Run(tc.ip.String(), func(t *testing.T) { @@ -124,7 +124,7 @@ func TestTransport_DialEndpoints(t *testing.T) { require.NoError(t, conn.Close()) require.NoError(t, err) } else { - require.Error(t, err) + require.Error(t, err, "endpoint=%s", e) } }) } diff --git a/proto/tendermint/p2p/pex.go b/proto/tendermint/p2p/pex.go index bd57ae6516f2..8ba8cd2b2eb2 100644 --- a/proto/tendermint/p2p/pex.go +++ b/proto/tendermint/p2p/pex.go @@ -13,8 +13,12 @@ func (m *PexMessage) Wrap(pb proto.Message) error { m.Sum = &PexMessage_PexRequest{PexRequest: msg} case *PexResponse: m.Sum = &PexMessage_PexResponse{PexResponse: msg} + case *PexRequestV2: + m.Sum = &PexMessage_PexRequestV2{PexRequestV2: msg} + case *PexResponseV2: + m.Sum = &PexMessage_PexResponseV2{PexResponseV2: msg} default: - return fmt.Errorf("unknown message: %T", msg) + return fmt.Errorf("unknown pex message: %T", msg) } return nil } @@ -27,7 +31,11 @@ func (m *PexMessage) Unwrap() (proto.Message, error) { return msg.PexRequest, nil case *PexMessage_PexResponse: return msg.PexResponse, nil + case *PexMessage_PexRequestV2: + return msg.PexRequestV2, nil + case *PexMessage_PexResponseV2: + return msg.PexResponseV2, nil default: - return nil, fmt.Errorf("unknown message: %T", msg) + return nil, fmt.Errorf("unknown pex message: %T", msg) } } diff --git a/proto/tendermint/p2p/pex.pb.go b/proto/tendermint/p2p/pex.pb.go index fff1b5db87c4..63882c364313 100644 --- a/proto/tendermint/p2p/pex.pb.go +++ b/proto/tendermint/p2p/pex.pb.go @@ -163,10 +163,136 @@ func (m *PexResponse) GetAddresses() []PexAddress { return nil } +type PexAddressV2 struct { + URL string `protobuf:"bytes,1,opt,name=url,proto3" json:"url,omitempty"` +} + +func (m *PexAddressV2) Reset() { *m = PexAddressV2{} } +func (m *PexAddressV2) String() string { return proto.CompactTextString(m) } +func (*PexAddressV2) ProtoMessage() {} +func (*PexAddressV2) Descriptor() ([]byte, []int) { + return fileDescriptor_81c2f011fd13be57, []int{3} +} +func (m *PexAddressV2) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PexAddressV2) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PexAddressV2.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PexAddressV2) XXX_Merge(src proto.Message) { + xxx_messageInfo_PexAddressV2.Merge(m, src) +} +func (m *PexAddressV2) XXX_Size() int { + return m.Size() +} +func (m *PexAddressV2) XXX_DiscardUnknown() { + xxx_messageInfo_PexAddressV2.DiscardUnknown(m) +} + +var xxx_messageInfo_PexAddressV2 proto.InternalMessageInfo + +func (m *PexAddressV2) GetURL() string { + if m != nil { + return m.URL + } + return "" +} + +type PexRequestV2 struct { +} + +func (m *PexRequestV2) Reset() { *m = PexRequestV2{} } +func (m *PexRequestV2) String() string { return proto.CompactTextString(m) } +func (*PexRequestV2) ProtoMessage() {} +func (*PexRequestV2) Descriptor() ([]byte, []int) { + return fileDescriptor_81c2f011fd13be57, []int{4} +} +func (m *PexRequestV2) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PexRequestV2) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PexRequestV2.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PexRequestV2) XXX_Merge(src proto.Message) { + xxx_messageInfo_PexRequestV2.Merge(m, src) +} +func (m *PexRequestV2) XXX_Size() int { + return m.Size() +} +func (m *PexRequestV2) XXX_DiscardUnknown() { + xxx_messageInfo_PexRequestV2.DiscardUnknown(m) +} + +var xxx_messageInfo_PexRequestV2 proto.InternalMessageInfo + +type PexResponseV2 struct { + Addresses []PexAddressV2 `protobuf:"bytes,1,rep,name=addresses,proto3" json:"addresses"` +} + +func (m *PexResponseV2) Reset() { *m = PexResponseV2{} } +func (m *PexResponseV2) String() string { return proto.CompactTextString(m) } +func (*PexResponseV2) ProtoMessage() {} +func (*PexResponseV2) Descriptor() ([]byte, []int) { + return fileDescriptor_81c2f011fd13be57, []int{5} +} +func (m *PexResponseV2) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PexResponseV2) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PexResponseV2.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PexResponseV2) XXX_Merge(src proto.Message) { + xxx_messageInfo_PexResponseV2.Merge(m, src) +} +func (m *PexResponseV2) XXX_Size() int { + return m.Size() +} +func (m *PexResponseV2) XXX_DiscardUnknown() { + xxx_messageInfo_PexResponseV2.DiscardUnknown(m) +} + +var xxx_messageInfo_PexResponseV2 proto.InternalMessageInfo + +func (m *PexResponseV2) GetAddresses() []PexAddressV2 { + if m != nil { + return m.Addresses + } + return nil +} + type PexMessage struct { // Types that are valid to be assigned to Sum: // *PexMessage_PexRequest // *PexMessage_PexResponse + // *PexMessage_PexRequestV2 + // *PexMessage_PexResponseV2 Sum isPexMessage_Sum `protobuf_oneof:"sum"` } @@ -174,7 +300,7 @@ func (m *PexMessage) Reset() { *m = PexMessage{} } func (m *PexMessage) String() string { return proto.CompactTextString(m) } func (*PexMessage) ProtoMessage() {} func (*PexMessage) Descriptor() ([]byte, []int) { - return fileDescriptor_81c2f011fd13be57, []int{3} + return fileDescriptor_81c2f011fd13be57, []int{6} } func (m *PexMessage) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -215,9 +341,17 @@ type PexMessage_PexRequest struct { type PexMessage_PexResponse struct { PexResponse *PexResponse `protobuf:"bytes,2,opt,name=pex_response,json=pexResponse,proto3,oneof" json:"pex_response,omitempty"` } +type PexMessage_PexRequestV2 struct { + PexRequestV2 *PexRequestV2 `protobuf:"bytes,3,opt,name=pex_request_v2,json=pexRequestV2,proto3,oneof" json:"pex_request_v2,omitempty"` +} +type PexMessage_PexResponseV2 struct { + PexResponseV2 *PexResponseV2 `protobuf:"bytes,4,opt,name=pex_response_v2,json=pexResponseV2,proto3,oneof" json:"pex_response_v2,omitempty"` +} -func (*PexMessage_PexRequest) isPexMessage_Sum() {} -func (*PexMessage_PexResponse) isPexMessage_Sum() {} +func (*PexMessage_PexRequest) isPexMessage_Sum() {} +func (*PexMessage_PexResponse) isPexMessage_Sum() {} +func (*PexMessage_PexRequestV2) isPexMessage_Sum() {} +func (*PexMessage_PexResponseV2) isPexMessage_Sum() {} func (m *PexMessage) GetSum() isPexMessage_Sum { if m != nil { @@ -240,11 +374,27 @@ func (m *PexMessage) GetPexResponse() *PexResponse { return nil } +func (m *PexMessage) GetPexRequestV2() *PexRequestV2 { + if x, ok := m.GetSum().(*PexMessage_PexRequestV2); ok { + return x.PexRequestV2 + } + return nil +} + +func (m *PexMessage) GetPexResponseV2() *PexResponseV2 { + if x, ok := m.GetSum().(*PexMessage_PexResponseV2); ok { + return x.PexResponseV2 + } + return nil +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*PexMessage) XXX_OneofWrappers() []interface{} { return []interface{}{ (*PexMessage_PexRequest)(nil), (*PexMessage_PexResponse)(nil), + (*PexMessage_PexRequestV2)(nil), + (*PexMessage_PexResponseV2)(nil), } } @@ -252,33 +402,42 @@ func init() { proto.RegisterType((*PexAddress)(nil), "tendermint.p2p.PexAddress") proto.RegisterType((*PexRequest)(nil), "tendermint.p2p.PexRequest") proto.RegisterType((*PexResponse)(nil), "tendermint.p2p.PexResponse") + proto.RegisterType((*PexAddressV2)(nil), "tendermint.p2p.PexAddressV2") + proto.RegisterType((*PexRequestV2)(nil), "tendermint.p2p.PexRequestV2") + proto.RegisterType((*PexResponseV2)(nil), "tendermint.p2p.PexResponseV2") proto.RegisterType((*PexMessage)(nil), "tendermint.p2p.PexMessage") } func init() { proto.RegisterFile("tendermint/p2p/pex.proto", fileDescriptor_81c2f011fd13be57) } var fileDescriptor_81c2f011fd13be57 = []byte{ - // 310 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x51, 0x31, 0x4b, 0xc3, 0x40, - 0x18, 0xbd, 0x4b, 0x6b, 0xa1, 0x97, 0xea, 0x70, 0x88, 0x84, 0x0a, 0xd7, 0x92, 0xa9, 0x53, 0x02, - 0x11, 0x47, 0x45, 0x83, 0x43, 0x1d, 0x8a, 0xe5, 0x46, 0x17, 0x69, 0xcd, 0x47, 0xcc, 0xd0, 0xde, - 0x67, 0xee, 0x0a, 0xfd, 0x19, 0x0e, 0xfe, 0xa8, 0x8e, 0x1d, 0x9d, 0x8a, 0xa4, 0x7f, 0x44, 0xbc, - 0x13, 0x93, 0x42, 0xb7, 0x7b, 0xef, 0xfb, 0xde, 0xfb, 0xde, 0xf1, 0x58, 0x60, 0x60, 0x99, 0x41, - 0xb9, 0x28, 0x96, 0x26, 0xc6, 0x04, 0x63, 0x84, 0x75, 0x84, 0xa5, 0x32, 0x8a, 0x9f, 0xd5, 0x93, - 0x08, 0x13, 0xec, 0x9f, 0xe7, 0x2a, 0x57, 0x76, 0x14, 0xff, 0xbe, 0xdc, 0x56, 0x38, 0x65, 0x6c, - 0x0a, 0xeb, 0xfb, 0x2c, 0x2b, 0x41, 0x6b, 0x7e, 0xc1, 0xbc, 0x22, 0x0b, 0xe8, 0x90, 0x8e, 0xba, - 0x69, 0xa7, 0xda, 0x0d, 0xbc, 0xc7, 0x07, 0xe9, 0x15, 0x99, 0xe5, 0x31, 0xf0, 0x1a, 0xfc, 0x54, - 0x7a, 0x05, 0x72, 0xce, 0xda, 0xa8, 0x4a, 0x13, 0xb4, 0x86, 0x74, 0x74, 0x2a, 0xed, 0x3b, 0xec, - 0x59, 0x47, 0x09, 0xef, 0x2b, 0xd0, 0x26, 0x9c, 0x30, 0xdf, 0x22, 0x8d, 0x6a, 0xa9, 0x81, 0xdf, - 0xb2, 0xee, 0xcc, 0xdd, 0x02, 0x1d, 0xd0, 0x61, 0x6b, 0xe4, 0x27, 0xfd, 0xe8, 0x30, 0x68, 0x54, - 0xe7, 0x49, 0xdb, 0x9b, 0xdd, 0x80, 0xc8, 0x5a, 0x12, 0x7e, 0x52, 0xeb, 0x3e, 0x01, 0xad, 0x67, - 0x39, 0xf0, 0x1b, 0xe6, 0x23, 0xac, 0x5f, 0x4a, 0x77, 0xcc, 0x06, 0x3f, 0x6e, 0xf8, 0x17, 0x67, - 0x4c, 0x24, 0xc3, 0x7f, 0xc4, 0xef, 0x58, 0xcf, 0xc9, 0x5d, 0x3a, 0xfb, 0x41, 0x3f, 0xb9, 0x3c, - 0xaa, 0x77, 0x2b, 0x63, 0x22, 0x7d, 0xac, 0x61, 0x7a, 0xc2, 0x5a, 0x7a, 0xb5, 0x48, 0x9f, 0x36, - 0x95, 0xa0, 0xdb, 0x4a, 0xd0, 0xef, 0x4a, 0xd0, 0x8f, 0xbd, 0x20, 0xdb, 0xbd, 0x20, 0x5f, 0x7b, - 0x41, 0x9e, 0xaf, 0xf3, 0xc2, 0xbc, 0xad, 0xe6, 0xd1, 0xab, 0x5a, 0xc4, 0x8d, 0xaa, 0x9a, 0xad, - 0xd9, 0x4a, 0x0e, 0x6b, 0x9c, 0x77, 0x2c, 0x7b, 0xf5, 0x13, 0x00, 0x00, 0xff, 0xff, 0xa6, 0xa1, - 0x59, 0x3c, 0xdf, 0x01, 0x00, 0x00, + // 407 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0xdd, 0x8a, 0xda, 0x40, + 0x14, 0xc7, 0xf3, 0x61, 0x2d, 0x9e, 0x44, 0x0b, 0x43, 0x29, 0xa9, 0x6d, 0xa3, 0xe4, 0xca, 0xde, + 0x24, 0x30, 0xa5, 0x97, 0x2d, 0x36, 0x08, 0xb5, 0x50, 0xa9, 0x1d, 0xd8, 0x5c, 0xec, 0x8d, 0xe8, + 0x66, 0xc8, 0x06, 0x56, 0x33, 0x9b, 0x49, 0x16, 0x1f, 0x63, 0xdf, 0x61, 0x5f, 0xc6, 0x4b, 0x2f, + 0xf7, 0x4a, 0x96, 0xf8, 0x22, 0x8b, 0x13, 0x31, 0x23, 0xba, 0x7b, 0x37, 0xe7, 0x7f, 0xbe, 0x7e, + 0xe7, 0xcc, 0x01, 0x2b, 0xa3, 0x8b, 0x90, 0xa6, 0xf3, 0x78, 0x91, 0x79, 0x0c, 0x33, 0x8f, 0xd1, + 0xa5, 0xcb, 0xd2, 0x24, 0x4b, 0x50, 0xab, 0xf2, 0xb8, 0x0c, 0xb3, 0xf6, 0xfb, 0x28, 0x89, 0x12, + 0xe1, 0xf2, 0x76, 0xaf, 0x32, 0xca, 0x19, 0x03, 0x8c, 0xe9, 0xf2, 0x57, 0x18, 0xa6, 0x94, 0x73, + 0xf4, 0x01, 0xb4, 0x38, 0xb4, 0xd4, 0xae, 0xda, 0x6b, 0xf8, 0xf5, 0x62, 0xd3, 0xd1, 0xfe, 0x0c, + 0x88, 0x16, 0x87, 0x42, 0x67, 0x96, 0x26, 0xe9, 0x63, 0xa2, 0xc5, 0x0c, 0x21, 0xa8, 0xb1, 0x24, + 0xcd, 0x2c, 0xbd, 0xab, 0xf6, 0x9a, 0x44, 0xbc, 0x1d, 0x53, 0x54, 0x24, 0xf4, 0x36, 0xa7, 0x3c, + 0x73, 0x46, 0x60, 0x08, 0x8b, 0xb3, 0x64, 0xc1, 0x29, 0xfa, 0x09, 0x8d, 0x69, 0xd9, 0x8b, 0x72, + 0x4b, 0xed, 0xea, 0x3d, 0x03, 0xb7, 0xdd, 0x63, 0x50, 0xb7, 0xe2, 0xf1, 0x6b, 0xab, 0x4d, 0x47, + 0x21, 0x55, 0x8a, 0xf3, 0x15, 0xcc, 0xca, 0x1d, 0x60, 0xf4, 0x11, 0xf4, 0x3c, 0xbd, 0xd9, 0x13, + 0xbf, 0x2d, 0x36, 0x1d, 0xfd, 0x82, 0xfc, 0x25, 0x3b, 0xcd, 0x69, 0x89, 0xd0, 0x3d, 0x47, 0x80, + 0x9d, 0xff, 0xd0, 0x94, 0x48, 0x02, 0x8c, 0xfa, 0xa7, 0x2c, 0x9f, 0x5f, 0x66, 0x09, 0xf0, 0x29, + 0xcd, 0x83, 0x26, 0x66, 0x1d, 0x51, 0xce, 0xa7, 0x11, 0x45, 0x3f, 0xc0, 0x60, 0x74, 0x39, 0x49, + 0xcb, 0x96, 0x02, 0xea, 0xfc, 0x78, 0x7b, 0xa8, 0xa1, 0x42, 0x80, 0x1d, 0x2c, 0xd4, 0x07, 0xb3, + 0x4c, 0x2f, 0x09, 0xc5, 0xba, 0x0d, 0xfc, 0xe9, 0x6c, 0x7e, 0x19, 0x32, 0x54, 0x88, 0xc1, 0xa4, + 0xed, 0x0e, 0xa0, 0x25, 0x01, 0x4c, 0xee, 0xb0, 0xf8, 0x98, 0xf3, 0x63, 0x1d, 0x16, 0x33, 0x54, + 0x88, 0xc9, 0x24, 0x1b, 0xfd, 0x86, 0x77, 0x32, 0xc7, 0xae, 0x4c, 0x4d, 0x94, 0xf9, 0xf2, 0x0a, + 0x8a, 0xa8, 0xd3, 0x64, 0xb2, 0xe0, 0xbf, 0x01, 0x9d, 0xe7, 0x73, 0xff, 0xdf, 0xaa, 0xb0, 0xd5, + 0x75, 0x61, 0xab, 0x4f, 0x85, 0xad, 0xde, 0x6f, 0x6d, 0x65, 0xbd, 0xb5, 0x95, 0xc7, 0xad, 0xad, + 0x5c, 0x7e, 0x8f, 0xe2, 0xec, 0x3a, 0x9f, 0xb9, 0x57, 0xc9, 0xdc, 0x93, 0xee, 0x58, 0x3e, 0x69, + 0x71, 0xaf, 0xc7, 0x37, 0x3e, 0xab, 0x0b, 0xf5, 0xdb, 0x73, 0x00, 0x00, 0x00, 0xff, 0xff, 0x9f, + 0x9b, 0xfd, 0x75, 0xfc, 0x02, 0x00, 0x00, } func (m *PexAddress) Marshal() (dAtA []byte, err error) { @@ -383,6 +542,96 @@ func (m *PexResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *PexAddressV2) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PexAddressV2) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PexAddressV2) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.URL) > 0 { + i -= len(m.URL) + copy(dAtA[i:], m.URL) + i = encodeVarintPex(dAtA, i, uint64(len(m.URL))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *PexRequestV2) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PexRequestV2) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PexRequestV2) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *PexResponseV2) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PexResponseV2) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PexResponseV2) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Addresses) > 0 { + for iNdEx := len(m.Addresses) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Addresses[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintPex(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + func (m *PexMessage) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -457,6 +706,48 @@ func (m *PexMessage_PexResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) } return len(dAtA) - i, nil } +func (m *PexMessage_PexRequestV2) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PexMessage_PexRequestV2) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.PexRequestV2 != nil { + { + size, err := m.PexRequestV2.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintPex(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + return len(dAtA) - i, nil +} +func (m *PexMessage_PexResponseV2) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PexMessage_PexResponseV2) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.PexResponseV2 != nil { + { + size, err := m.PexResponseV2.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintPex(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + return len(dAtA) - i, nil +} func encodeVarintPex(dAtA []byte, offset int, v uint64) int { offset -= sovPex(v) base := offset @@ -512,6 +803,43 @@ func (m *PexResponse) Size() (n int) { return n } +func (m *PexAddressV2) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.URL) + if l > 0 { + n += 1 + l + sovPex(uint64(l)) + } + return n +} + +func (m *PexRequestV2) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *PexResponseV2) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Addresses) > 0 { + for _, e := range m.Addresses { + l = e.Size() + n += 1 + l + sovPex(uint64(l)) + } + } + return n +} + func (m *PexMessage) Size() (n int) { if m == nil { return 0 @@ -548,6 +876,30 @@ func (m *PexMessage_PexResponse) Size() (n int) { } return n } +func (m *PexMessage_PexRequestV2) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.PexRequestV2 != nil { + l = m.PexRequestV2.Size() + n += 1 + l + sovPex(uint64(l)) + } + return n +} +func (m *PexMessage_PexResponseV2) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.PexResponseV2 != nil { + l = m.PexResponseV2.Size() + n += 1 + l + sovPex(uint64(l)) + } + return n +} func sovPex(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 @@ -822,7 +1174,7 @@ func (m *PexResponse) Unmarshal(dAtA []byte) error { } return nil } -func (m *PexMessage) Unmarshal(dAtA []byte) error { +func (m *PexAddressV2) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -845,17 +1197,17 @@ func (m *PexMessage) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: PexMessage: wiretype end group for non-group") + return fmt.Errorf("proto: PexAddressV2: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: PexMessage: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: PexAddressV2: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field PexRequest", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field URL", wireType) } - var msglen int + var stringLen uint64 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowPex @@ -865,13 +1217,229 @@ func (m *PexMessage) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= int(b&0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } } - if msglen < 0 { - return ErrInvalidLengthPex + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthPex + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthPex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.URL = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipPex(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthPex + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PexRequestV2) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PexRequestV2: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PexRequestV2: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipPex(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthPex + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PexResponseV2) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PexResponseV2: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PexResponseV2: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Addresses", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthPex + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthPex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Addresses = append(m.Addresses, PexAddressV2{}) + if err := m.Addresses[len(m.Addresses)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipPex(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthPex + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PexMessage) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PexMessage: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PexMessage: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PexRequest", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthPex } postIndex := iNdEx + msglen if postIndex < 0 { @@ -921,6 +1489,76 @@ func (m *PexMessage) Unmarshal(dAtA []byte) error { } m.Sum = &PexMessage_PexResponse{v} iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PexRequestV2", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthPex + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthPex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &PexRequestV2{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Sum = &PexMessage_PexRequestV2{v} + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PexResponseV2", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthPex + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthPex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &PexResponseV2{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Sum = &PexMessage_PexResponseV2{v} + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipPex(dAtA[iNdEx:]) diff --git a/proto/tendermint/p2p/pex.proto b/proto/tendermint/p2p/pex.proto index 48e1cfce3daf..4e630f85fcda 100644 --- a/proto/tendermint/p2p/pex.proto +++ b/proto/tendermint/p2p/pex.proto @@ -17,9 +17,22 @@ message PexResponse { repeated PexAddress addresses = 1 [(gogoproto.nullable) = false]; } +message PexAddressV2 { + string url = 1 [(gogoproto.customname) = "URL"]; +} + +message PexRequestV2 {} + +message PexResponseV2 { + repeated PexAddressV2 addresses = 1 [(gogoproto.nullable) = false]; +} + + message PexMessage { oneof sum { - PexRequest pex_request = 1; - PexResponse pex_response = 2; + PexRequest pex_request = 1; + PexResponse pex_response = 2; + PexRequestV2 pex_request_v2 = 3; + PexResponseV2 pex_response_v2 = 4; } }