From 6d3ab8a3debe8d69dcd004173999732f12d0da96 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 14 Oct 2021 06:46:34 +1100 Subject: [PATCH] protocols/identify: Assist in peer discovery based on reported listen addresses from other peers (#2232) Co-authored-by: Max Inden --- protocols/identify/CHANGELOG.md | 4 + protocols/identify/Cargo.toml | 1 + protocols/identify/src/identify.rs | 132 +++++++++++++++++++++++++++-- 3 files changed, 129 insertions(+), 8 deletions(-) diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index b05cafa75e8..1d7378d0771 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -5,6 +5,10 @@ - Update dependencies. +- Assist in peer discovery by returning reported listen addresses + of other peers from `addresses_of_peer`. + [PR 2232](https://github.com/libp2p/rust-libp2p/pull/2232) + # 0.30.0 [2021-07-12] - Update dependencies. diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 6f6c37d12e2..f540dc4c687 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -14,6 +14,7 @@ futures = "0.3.1" libp2p-core = { version = "0.30.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.31.0", path = "../../swarm" } log = "0.4.1" +lru = "0.6" prost = "0.8" smallvec = "1.6.1" wasm-timer = "0.2" diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index d75dfb72054..cc157216b30 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -31,9 +31,11 @@ use libp2p_swarm::{ NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler, ProtocolsHandlerUpgrErr, }; +use lru::LruCache; use std::{ collections::{HashMap, HashSet, VecDeque}, io, + iter::FromIterator, pin::Pin, task::Context, task::Poll, @@ -57,6 +59,8 @@ pub struct Identify { /// Peers to which an active push with current information about /// the local peer should be sent. pending_push: HashSet, + /// The addresses of all peers that we have discovered. + discovered_peers: LruCache>, } /// A pending reply to an inbound identification request. @@ -109,6 +113,10 @@ pub struct IdentifyConfig { /// /// Disabled by default. pub push_listen_addr_updates: bool, + + /// How many entries of discovered peers to keep before we discard + /// the least-recently used one. + pub cache_size: usize, } impl IdentifyConfig { @@ -122,6 +130,7 @@ impl IdentifyConfig { initial_delay: Duration::from_millis(500), interval: Duration::from_secs(5 * 60), push_listen_addr_updates: false, + cache_size: 100, } } @@ -152,17 +161,29 @@ impl IdentifyConfig { self.push_listen_addr_updates = b; self } + + /// Configures the size of the LRU cache, caching addresses of discovered peers. + /// + /// The [`Swarm`](libp2p_swarm::Swarm) may extend the set of addresses of an outgoing connection attempt via + /// [`Identify::addresses_of_peer`]. + pub fn with_cache_size(mut self, cache_size: usize) -> Self { + self.cache_size = cache_size; + self + } } impl Identify { /// Creates a new `Identify` network behaviour. pub fn new(config: IdentifyConfig) -> Self { + let discovered_peers = LruCache::new(config.cache_size); + Identify { config, connected: HashMap::new(), pending_replies: VecDeque::new(), events: VecDeque::new(), pending_push: HashSet::new(), + discovered_peers, } } @@ -254,6 +275,10 @@ impl NetworkBehaviour for Identify { ) { match event { IdentifyHandlerEvent::Identified(info) => { + // Replace existing addresses to prevent other peer from filling up our memory. + self.discovered_peers + .put(peer_id, HashSet::from_iter(info.listen_addrs.clone())); + let observed = info.observed_addr.clone(); self.events.push_back(NetworkBehaviourAction::GenerateEvent( IdentifyEvent::Received { peer_id, info }, @@ -388,6 +413,27 @@ impl NetworkBehaviour for Identify { Poll::Pending } + + fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { + self.discovered_peers + .get(peer) + .cloned() + .map(|addr| Vec::from_iter(addr)) + .unwrap_or_default() + } + + fn inject_addr_reach_failure( + &mut self, + peer_id: Option<&PeerId>, + addr: &Multiaddr, + _: &dyn std::error::Error, + ) { + if let Some(peer) = peer_id { + if let Some(entry) = self.discovered_peers.get_mut(peer) { + entry.remove(addr); + } + } + } } /// Event emitted by the `Identify` behaviour. @@ -552,11 +598,7 @@ mod tests { let (mut swarm1, pubkey1) = { let (pubkey, transport) = transport(); - let protocol = Identify::new( - IdentifyConfig::new("a".to_string(), pubkey.clone()) - // Delay identification requests so we can test the push protocol. - .with_initial_delay(Duration::from_secs(u32::MAX as u64)), - ); + let protocol = Identify::new(IdentifyConfig::new("a".to_string(), pubkey.clone())); let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id()); (swarm, pubkey) }; @@ -565,9 +607,7 @@ mod tests { let (pubkey, transport) = transport(); let protocol = Identify::new( IdentifyConfig::new("a".to_string(), pubkey.clone()) - .with_agent_version("b".to_string()) - // Delay identification requests so we can test the push protocol. - .with_initial_delay(Duration::from_secs(u32::MAX as u64)), + .with_agent_version("b".to_string()), ); let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id()); (swarm, pubkey) @@ -626,4 +666,80 @@ mod tests { } }) } + + #[test] + fn discover_peer_after_disconnect() { + let _ = env_logger::try_init(); + + let mut swarm1 = { + let (pubkey, transport) = transport(); + let protocol = Identify::new(IdentifyConfig::new("a".to_string(), pubkey.clone())); + + Swarm::new(transport, protocol, pubkey.to_peer_id()) + }; + + let mut swarm2 = { + let (pubkey, transport) = transport(); + let protocol = Identify::new( + IdentifyConfig::new("a".to_string(), pubkey.clone()) + .with_agent_version("b".to_string()), + ); + + Swarm::new(transport, protocol, pubkey.to_peer_id()) + }; + + let swarm1_peer_id = *swarm1.local_peer_id(); + + let listener = swarm1 + .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) + .unwrap(); + + let listen_addr = async_std::task::block_on(async { + loop { + match swarm1.select_next_some().await { + SwarmEvent::NewListenAddr { + address, + listener_id, + } if listener_id == listener => return address, + _ => {} + } + } + }); + + async_std::task::spawn(async move { + loop { + swarm1.next().await; + } + }); + + swarm2.dial_addr(listen_addr).unwrap(); + + // wait until we identified + async_std::task::block_on(async { + loop { + if let SwarmEvent::Behaviour(IdentifyEvent::Received { .. }) = + swarm2.select_next_some().await + { + break; + } + } + }); + + swarm2.disconnect_peer_id(swarm1_peer_id).unwrap(); + + // we should still be able to dial now! + swarm2.dial(&swarm1_peer_id).unwrap(); + + let connected_peer = async_std::task::block_on(async { + loop { + if let SwarmEvent::ConnectionEstablished { peer_id, .. } = + swarm2.select_next_some().await + { + break peer_id; + } + } + }); + + assert_eq!(connected_peer, swarm1_peer_id); + } }