Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libp2p-dns] Implement /dnsaddr resolution. #1931

Merged
merged 4 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,35 +64,35 @@ atomic = "0.5.0"
bytes = "1"
futures = "0.3.1"
lazy_static = "1.2"
libp2p-core = { version = "0.27.2", path = "core", default-features = false }
libp2p-core = { version = "0.28.0", path = "core", default-features = false }
libp2p-floodsub = { version = "0.28.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.29.0", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.28.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.29.0", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.27.2", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.29.0", path = "transports/noise", optional = true }
libp2p-mplex = { version = "0.28.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.30.0", path = "transports/noise", optional = true }
libp2p-ping = { version = "0.28.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.27.1", path = "transports/plaintext", optional = true }
libp2p-plaintext = { version = "0.28.0", path = "transports/plaintext", optional = true }
libp2p-pnet = { version = "0.20.0", path = "transports/pnet", optional = true }
libp2p-relay = { version = "0.1.0", path = "protocols/relay", optional = true }
libp2p-request-response = { version = "0.10.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.28.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.22.0", path = "swarm-derive" }
libp2p-uds = { version = "0.27.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.27.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.30.1", path = "muxers/yamux", optional = true }
multiaddr = { package = "parity-multiaddr", version = "0.11.1", path = "misc/multiaddr" }
libp2p-uds = { version = "0.28.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.28.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.31.0", path = "muxers/yamux", optional = true }
multiaddr = { package = "parity-multiaddr", version = "0.11.2", path = "misc/multiaddr" }
parking_lot = "0.11.0"
pin-project = "1.0.0"
smallvec = "1.6.1"
wasm-timer = "0.2.4"

[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.27.1", path = "transports/deflate", optional = true }
libp2p-deflate = { version = "0.28.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.28.0", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.29.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.27.1", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.28.0", path = "transports/websocket", optional = true }
libp2p-tcp = { version = "0.28.0", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.29.0", path = "transports/websocket", optional = true }

[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
Expand Down
10 changes: 9 additions & 1 deletion core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
# 0.27.2 [unreleased]
# 0.28.0 [unreleased]

- `Network::dial()` understands `/p2p` addresses and `Transport::dial`
gets a "fully qualified" `/p2p` address when dialing a specific peer,
whether through the `Network::peer()` API or via `Network::dial()`
with a `/p2p` address.

- `Network::dial()` and `network::Peer::dial()` return a `DialError`
on error.

- Shorten and unify `Debug` impls of public keys.

Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-core"
edition = "2018"
description = "Core traits and structs of libp2p"
version = "0.27.2"
version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
2 changes: 1 addition & 1 deletion core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>

/// Returns an iterator over all connected peers, i.e. those that have
/// at least one established connection in the pool.
pub fn iter_connected<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
pub fn iter_connected(&self) -> impl Iterator<Item = &PeerId> {
self.established.keys()
}

Expand Down
78 changes: 66 additions & 12 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,14 @@ where
&self.local_peer_id
}

/// Dials a multiaddress without expecting a particular remote peer ID.
/// Dials a [`Multiaddr`] that may or may not encapsulate a
/// specific expected remote peer ID.
///
/// The given `handler` will be used to create the
/// [`Connection`](crate::connection::Connection) upon success and the
/// connection ID is returned.
pub fn dial(&mut self, address: &Multiaddr, handler: THandler)
-> Result<ConnectionId, ConnectionLimit>
-> Result<ConnectionId, DialError>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
Expand All @@ -225,15 +226,32 @@ where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
// If the address ultimately encapsulates an expected peer ID, dial that peer
// such that any mismatch is detected. We do not "pop off" the `P2p` protocol
// from the address, because it may be used by the `Transport`, i.e. `P2p`
// is a protocol component that can influence any transport, like `libp2p-dns`.
if let Some(multiaddr::Protocol::P2p(ma)) = address.iter().last() {
if let Ok(peer) = PeerId::try_from(ma) {
return self.dial_peer(DialingOpts {
peer,
address: address.clone(),
handler,
remaining: Vec::new(),
})
}
}

// The address does not specify an expected peer, so just try to dial it as-is,
// accepting any peer ID that the remote identifies as.
let info = OutgoingInfo { address, peer_id: None };
match self.transport().clone().dial(address.clone()) {
Ok(f) => {
let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
self.pool.add_outgoing(f, handler, info)
self.pool.add_outgoing(f, handler, info).map_err(DialError::ConnectionLimit)
}
Err(err) => {
let f = future::err(PendingConnectionError::Transport(err));
self.pool.add_outgoing(f, handler, info)
self.pool.add_outgoing(f, handler, info).map_err(DialError::ConnectionLimit)
}
}
}
Expand Down Expand Up @@ -430,7 +448,7 @@ where

/// Initiates a connection attempt to a known peer.
fn dial_peer(&mut self, opts: DialingOpts<PeerId, THandler>)
-> Result<ConnectionId, ConnectionLimit>
-> Result<ConnectionId, DialError>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static,
Expand Down Expand Up @@ -460,7 +478,7 @@ fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans>(
<THandler::Handler as ConnectionHandler>::Error>,
dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
opts: DialingOpts<PeerId, THandler>
) -> Result<ConnectionId, ConnectionLimit>
) -> Result<ConnectionId, DialError>
where
THandler: IntoConnectionHandler + Send + 'static,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
Expand All @@ -478,23 +496,28 @@ where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
let result = match transport.dial(opts.address.clone()) {
// Ensure the address to dial encapsulates the `p2p` protocol for the
// targeted peer, so that the transport has a "fully qualified" address
// to work with.
let addr = p2p_addr(opts.peer, opts.address).map_err(DialError::InvalidAddress)?;

let result = match transport.dial(addr.clone()) {
Ok(fut) => {
let fut = fut.map_err(|e| PendingConnectionError::Transport(TransportError::Other(e)));
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info)
let info = OutgoingInfo { address: &addr, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info).map_err(DialError::ConnectionLimit)
},
Err(err) => {
let fut = future::err(PendingConnectionError::Transport(err));
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info)
let info = OutgoingInfo { address: &addr, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info).map_err(DialError::ConnectionLimit)
},
};

if let Ok(id) = &result {
dialing.entry(opts.peer).or_default().push(
peer::DialingState {
current: (*id, opts.address),
current: (*id, addr),
remaining: opts.remaining,
},
);
Expand Down Expand Up @@ -668,6 +691,37 @@ impl NetworkConfig {
}
}

/// Ensures a given `Multiaddr` is a `/p2p/...` address for the given peer.
///
/// If the given address is already a `p2p` address for the given peer,
/// i.e. the last encapsulated protocol is `/p2p/<peer-id>`, this is a no-op.
///
/// If the given address is already a `p2p` address for a different peer
/// than the one given, the given `Multiaddr` is returned as an `Err`.
///
/// If the given address is not yet a `p2p` address for the given peer,
/// the `/p2p/<peer-id>` protocol is appended to the returned address.
fn p2p_addr(peer: PeerId, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
if let Some(multiaddr::Protocol::P2p(hash)) = addr.iter().last() {
if &hash != peer.as_ref() {
return Err(addr)
}
Ok(addr)
} else {
Ok(addr.with(multiaddr::Protocol::P2p(peer.into())))
}
}

/// Possible (synchronous) errors when dialing a peer.
#[derive(Clone, Debug)]
pub enum DialError {
/// The dialing attempt is rejected because of a connection limit.
ConnectionLimit(ConnectionLimit),
/// The address being dialed is invalid, e.g. if it refers to a different
/// remote peer than the one being dialed.
InvalidAddress(Multiaddr),
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
8 changes: 5 additions & 3 deletions core/src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::{
error,
fmt,
};
use super::{Network, DialingOpts};
use super::{Network, DialingOpts, DialError};

/// The possible representations of a peer in a [`Network`], as
/// seen by the local node.
Expand Down Expand Up @@ -210,7 +210,7 @@ where
pub fn dial<I>(self, address: Multiaddr, remaining: I, handler: THandler)
-> Result<
(ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
ConnectionLimit
DialError
>
where
I: IntoIterator<Item = Multiaddr>,
Expand All @@ -219,7 +219,9 @@ where
Peer::Connected(p) => (p.peer_id, p.network),
Peer::Dialing(p) => (p.peer_id, p.network),
Peer::Disconnected(p) => (p.peer_id, p.network),
Peer::Local => return Err(ConnectionLimit { current: 0, limit: 0 })
Peer::Local => return Err(DialError::ConnectionLimit(ConnectionLimit {
current: 0, limit: 0
}))
};

let id = network.dial_peer(DialingOpts {
Expand Down
19 changes: 7 additions & 12 deletions core/src/transport/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,19 +263,14 @@ impl Drop for Listener {

/// If the address is `/memory/n`, returns the value of `n`.
fn parse_memory_addr(a: &Multiaddr) -> Result<u64, ()> {
let mut iter = a.iter();

let port = if let Some(Protocol::Memory(port)) = iter.next() {
port
} else {
return Err(());
};

if iter.next().is_some() {
return Err(());
let mut protocols = a.iter();
match protocols.next() {
Some(Protocol::Memory(port)) => match protocols.next() {
None | Some(Protocol::P2p(_)) => Ok(port),
_ => Err(())
}
_ => Err(())
}

Ok(port)
}

/// A channel represents an established, in-memory, logical connection between two endpoints.
Expand Down
16 changes: 10 additions & 6 deletions core/tests/connection_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use libp2p_core::multiaddr::{multiaddr, Multiaddr};
use libp2p_core::{
PeerId,
connection::PendingConnectionError,
network::{NetworkEvent, NetworkConfig, ConnectionLimits},
network::{NetworkEvent, NetworkConfig, ConnectionLimits, DialError},
};
use rand::Rng;
use std::task::Poll;
Expand All @@ -47,12 +47,16 @@ fn max_outgoing() {
.expect("Unexpected connection limit.");
}

let err = network.peer(target.clone())
match network.peer(target.clone())
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.expect_err("Unexpected dialing success.");

assert_eq!(err.current, outgoing_limit);
assert_eq!(err.limit, outgoing_limit);
.expect_err("Unexpected dialing success.")
{
DialError::ConnectionLimit(err) => {
assert_eq!(err.current, outgoing_limit);
assert_eq!(err.limit, outgoing_limit);
}
e => panic!("Unexpected error: {:?}", e),
}

let info = network.info();
assert_eq!(info.num_peers(), 0);
Expand Down
17 changes: 12 additions & 5 deletions core/tests/network_dial_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use libp2p_core::multiaddr::multiaddr;
use libp2p_core::{
PeerId,
connection::PendingConnectionError,
multiaddr::Protocol,
network::{NetworkEvent, NetworkConfig},
};
use rand::seq::SliceRandom;
Expand Down Expand Up @@ -70,7 +71,7 @@ fn deny_incoming_connec() {
error: PendingConnectionError::Transport(_)
}) => {
assert_eq!(&peer_id, swarm1.local_peer_id());
assert_eq!(multiaddr, address);
assert_eq!(multiaddr, address.clone().with(Protocol::P2p(peer_id.into())));
return Poll::Ready(Ok(()));
},
Poll::Ready(_) => unreachable!(),
Expand Down Expand Up @@ -162,21 +163,27 @@ fn dial_self_by_id() {
fn multiple_addresses_err() {
// Tries dialing multiple addresses, and makes sure there's one dialing error per address.

let target = PeerId::random();

let mut swarm = test_network(NetworkConfig::default());

let mut addresses = Vec::new();
for _ in 0 .. 3 {
addresses.push(multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())]);
addresses.push(multiaddr![
Ip4([0, 0, 0, 0]),
Tcp(rand::random::<u16>())
]);
}
for _ in 0 .. 5 {
addresses.push(multiaddr![Udp(rand::random::<u16>())]);
addresses.push(multiaddr![
Udp(rand::random::<u16>())
]);
}
addresses.shuffle(&mut rand::thread_rng());

let first = addresses[0].clone();
let rest = (&addresses[1..]).iter().cloned();

let target = PeerId::random();
swarm.peer(target.clone())
.dial(first, rest, TestHandler())
.unwrap();
Expand All @@ -191,7 +198,7 @@ fn multiple_addresses_err() {
error: PendingConnectionError::Transport(_)
}) => {
assert_eq!(peer_id, target);
let expected = addresses.remove(0);
let expected = addresses.remove(0).with(Protocol::P2p(target.clone().into()));
assert_eq!(multiaddr, expected);
if addresses.is_empty() {
assert_eq!(attempts_remaining, 0);
Expand Down