Skip to content

Commit

Permalink
protocols/identify: Assist in peer discovery based on reported listen…
Browse files Browse the repository at this point in the history
… addresses from other peers (#2232)

Co-authored-by: Max Inden <mail@max-inden.de>
  • Loading branch information
thomaseizinger and mxinden committed Oct 13, 2021
1 parent 7718d1d commit 6d3ab8a
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 8 deletions.
4 changes: 4 additions & 0 deletions protocols/identify/CHANGELOG.md
Expand Up @@ -5,6 +5,10 @@

- Update dependencies.

- Assist in peer discovery by returning reported listen addresses
of other peers from `addresses_of_peer`.
[PR 2232](https://github.com/libp2p/rust-libp2p/pull/2232)

# 0.30.0 [2021-07-12]

- Update dependencies.
Expand Down
1 change: 1 addition & 0 deletions protocols/identify/Cargo.toml
Expand Up @@ -14,6 +14,7 @@ futures = "0.3.1"
libp2p-core = { version = "0.30.0", path = "../../core", default-features = false }
libp2p-swarm = { version = "0.31.0", path = "../../swarm" }
log = "0.4.1"
lru = "0.6"
prost = "0.8"
smallvec = "1.6.1"
wasm-timer = "0.2"
Expand Down
132 changes: 124 additions & 8 deletions protocols/identify/src/identify.rs
Expand Up @@ -31,9 +31,11 @@ use libp2p_swarm::{
NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler,
ProtocolsHandlerUpgrErr,
};
use lru::LruCache;
use std::{
collections::{HashMap, HashSet, VecDeque},
io,
iter::FromIterator,
pin::Pin,
task::Context,
task::Poll,
Expand All @@ -57,6 +59,8 @@ pub struct Identify {
/// Peers to which an active push with current information about
/// the local peer should be sent.
pending_push: HashSet<PeerId>,
/// The addresses of all peers that we have discovered.
discovered_peers: LruCache<PeerId, HashSet<Multiaddr>>,
}

/// A pending reply to an inbound identification request.
Expand Down Expand Up @@ -109,6 +113,10 @@ pub struct IdentifyConfig {
///
/// Disabled by default.
pub push_listen_addr_updates: bool,

/// How many entries of discovered peers to keep before we discard
/// the least-recently used one.
pub cache_size: usize,
}

impl IdentifyConfig {
Expand All @@ -122,6 +130,7 @@ impl IdentifyConfig {
initial_delay: Duration::from_millis(500),
interval: Duration::from_secs(5 * 60),
push_listen_addr_updates: false,
cache_size: 100,
}
}

Expand Down Expand Up @@ -152,17 +161,29 @@ impl IdentifyConfig {
self.push_listen_addr_updates = b;
self
}

/// Configures the size of the LRU cache, caching addresses of discovered peers.
///
/// The [`Swarm`](libp2p_swarm::Swarm) may extend the set of addresses of an outgoing connection attempt via
/// [`Identify::addresses_of_peer`].
pub fn with_cache_size(mut self, cache_size: usize) -> Self {
self.cache_size = cache_size;
self
}
}

impl Identify {
/// Creates a new `Identify` network behaviour.
pub fn new(config: IdentifyConfig) -> Self {
let discovered_peers = LruCache::new(config.cache_size);

Identify {
config,
connected: HashMap::new(),
pending_replies: VecDeque::new(),
events: VecDeque::new(),
pending_push: HashSet::new(),
discovered_peers,
}
}

Expand Down Expand Up @@ -254,6 +275,10 @@ impl NetworkBehaviour for Identify {
) {
match event {
IdentifyHandlerEvent::Identified(info) => {
// Replace existing addresses to prevent other peer from filling up our memory.
self.discovered_peers
.put(peer_id, HashSet::from_iter(info.listen_addrs.clone()));

let observed = info.observed_addr.clone();
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
IdentifyEvent::Received { peer_id, info },
Expand Down Expand Up @@ -388,6 +413,27 @@ impl NetworkBehaviour for Identify {

Poll::Pending
}

fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
self.discovered_peers
.get(peer)
.cloned()
.map(|addr| Vec::from_iter(addr))
.unwrap_or_default()
}

fn inject_addr_reach_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
_: &dyn std::error::Error,
) {
if let Some(peer) = peer_id {
if let Some(entry) = self.discovered_peers.get_mut(peer) {
entry.remove(addr);
}
}
}
}

/// Event emitted by the `Identify` behaviour.
Expand Down Expand Up @@ -552,11 +598,7 @@ mod tests {

let (mut swarm1, pubkey1) = {
let (pubkey, transport) = transport();
let protocol = Identify::new(
IdentifyConfig::new("a".to_string(), pubkey.clone())
// Delay identification requests so we can test the push protocol.
.with_initial_delay(Duration::from_secs(u32::MAX as u64)),
);
let protocol = Identify::new(IdentifyConfig::new("a".to_string(), pubkey.clone()));
let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id());
(swarm, pubkey)
};
Expand All @@ -565,9 +607,7 @@ mod tests {
let (pubkey, transport) = transport();
let protocol = Identify::new(
IdentifyConfig::new("a".to_string(), pubkey.clone())
.with_agent_version("b".to_string())
// Delay identification requests so we can test the push protocol.
.with_initial_delay(Duration::from_secs(u32::MAX as u64)),
.with_agent_version("b".to_string()),
);
let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id());
(swarm, pubkey)
Expand Down Expand Up @@ -626,4 +666,80 @@ mod tests {
}
})
}

#[test]
fn discover_peer_after_disconnect() {
let _ = env_logger::try_init();

let mut swarm1 = {
let (pubkey, transport) = transport();
let protocol = Identify::new(IdentifyConfig::new("a".to_string(), pubkey.clone()));

Swarm::new(transport, protocol, pubkey.to_peer_id())
};

let mut swarm2 = {
let (pubkey, transport) = transport();
let protocol = Identify::new(
IdentifyConfig::new("a".to_string(), pubkey.clone())
.with_agent_version("b".to_string()),
);

Swarm::new(transport, protocol, pubkey.to_peer_id())
};

let swarm1_peer_id = *swarm1.local_peer_id();

let listener = swarm1
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();

let listen_addr = async_std::task::block_on(async {
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr {
address,
listener_id,
} if listener_id == listener => return address,
_ => {}
}
}
});

async_std::task::spawn(async move {
loop {
swarm1.next().await;
}
});

swarm2.dial_addr(listen_addr).unwrap();

// wait until we identified
async_std::task::block_on(async {
loop {
if let SwarmEvent::Behaviour(IdentifyEvent::Received { .. }) =
swarm2.select_next_some().await
{
break;
}
}
});

swarm2.disconnect_peer_id(swarm1_peer_id).unwrap();

// we should still be able to dial now!
swarm2.dial(&swarm1_peer_id).unwrap();

let connected_peer = async_std::task::block_on(async {
loop {
if let SwarmEvent::ConnectionEstablished { peer_id, .. } =
swarm2.select_next_some().await
{
break peer_id;
}
}
});

assert_eq!(connected_peer, swarm1_peer_id);
}
}

0 comments on commit 6d3ab8a

Please sign in to comment.