Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libp2p-dns] Implement /dnsaddr resolution. #1931

Merged
merged 4 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 66 additions & 12 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,14 @@ where
&self.local_peer_id
}

/// Dials a multiaddress without expecting a particular remote peer ID.
/// Dials a [`Multiaddr`] that may or may not encapsulate a
/// specific expected remote peer ID.
///
/// The given `handler` will be used to create the
/// [`Connection`](crate::connection::Connection) upon success and the
/// connection ID is returned.
pub fn dial(&mut self, address: &Multiaddr, handler: THandler)
-> Result<ConnectionId, ConnectionLimit>
-> Result<ConnectionId, DialError>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
Expand All @@ -225,15 +226,32 @@ where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
// If the address ultimately encapsulates an expected peer ID, dial that peer
// such that any mismatch is detected. We do not "pop off" the `P2p` protocol
// from the address, because it may be used by the `Transport`, i.e. `P2p`
// is a protocol component that can influence any transport, like `libp2p-dns`.
if let Some(multiaddr::Protocol::P2p(ma)) = address.iter().last() {
if let Ok(peer) = PeerId::try_from(ma) {
return self.dial_peer(DialingOpts {
peer,
address: address.clone(),
handler,
remaining: Vec::new(),
})
}
}

// The address does not specify an expected peer, so just try to dial it as-is,
// accepting any peer ID that the remote identifies as.
let info = OutgoingInfo { address, peer_id: None };
match self.transport().clone().dial(address.clone()) {
Ok(f) => {
let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
self.pool.add_outgoing(f, handler, info)
self.pool.add_outgoing(f, handler, info).map_err(DialError::ConnectionLimit)
}
Err(err) => {
let f = future::err(PendingConnectionError::Transport(err));
self.pool.add_outgoing(f, handler, info)
self.pool.add_outgoing(f, handler, info).map_err(DialError::ConnectionLimit)
}
}
}
Expand Down Expand Up @@ -430,7 +448,7 @@ where

/// Initiates a connection attempt to a known peer.
fn dial_peer(&mut self, opts: DialingOpts<PeerId, THandler>)
-> Result<ConnectionId, ConnectionLimit>
-> Result<ConnectionId, DialError>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static,
Expand Down Expand Up @@ -460,7 +478,7 @@ fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans>(
<THandler::Handler as ConnectionHandler>::Error>,
dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
opts: DialingOpts<PeerId, THandler>
) -> Result<ConnectionId, ConnectionLimit>
) -> Result<ConnectionId, DialError>
where
THandler: IntoConnectionHandler + Send + 'static,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
Expand All @@ -478,23 +496,28 @@ where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
let result = match transport.dial(opts.address.clone()) {
// Ensure the address to dial encapsulates the `p2p` protocol for the
// targeted peer, so that the transport has a "fully qualified" address
// to work with.
let addr = p2p_addr(opts.peer, opts.address).map_err(DialError::InvalidAddress)?;

let result = match transport.dial(addr.clone()) {
Ok(fut) => {
let fut = fut.map_err(|e| PendingConnectionError::Transport(TransportError::Other(e)));
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info)
let info = OutgoingInfo { address: &addr, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info).map_err(DialError::ConnectionLimit)
},
Err(err) => {
let fut = future::err(PendingConnectionError::Transport(err));
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info)
let info = OutgoingInfo { address: &addr, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info).map_err(DialError::ConnectionLimit)
},
};

if let Ok(id) = &result {
dialing.entry(opts.peer).or_default().push(
peer::DialingState {
current: (*id, opts.address),
current: (*id, addr),
remaining: opts.remaining,
},
);
Expand Down Expand Up @@ -668,6 +691,37 @@ impl NetworkConfig {
}
}

/// Ensures a given `Multiaddr` is a `/p2p/...` address for the given peer.
///
/// If the given address is already a `p2p` address for the given peer,
/// i.e. the last encapsulated protocol is `/p2p/<peer-id>`, this is a no-op.
///
/// If the given address is already a `p2p` address for a different peer
/// than the one given, the given `Multiaddr` is returned as an `Err`.
///
/// If the given address is not yet a `p2p` address for the given peer,
/// the `/p2p/<peer-id>` protocol is appended to the returned address.
fn p2p_addr(peer: PeerId, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
if let Some(multiaddr::Protocol::P2p(hash)) = addr.iter().last() {
if &hash != peer.as_ref() {
return Err(addr)
}
Ok(addr)
} else {
Ok(addr.with(multiaddr::Protocol::P2p(peer.into())))
}
}

/// Possible (synchronous) errors when dialing a peer.
#[derive(Clone, Debug)]
pub enum DialError {
/// The dialing attempt is rejected because of a connection limit.
ConnectionLimit(ConnectionLimit),
/// The address being dialed is invalid, e.g. if it refers to a different
/// remote peer than the one being dialed.
InvalidAddress(Multiaddr),
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
8 changes: 5 additions & 3 deletions core/src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::{
error,
fmt,
};
use super::{Network, DialingOpts};
use super::{Network, DialingOpts, DialError};

/// The possible representations of a peer in a [`Network`], as
/// seen by the local node.
Expand Down Expand Up @@ -210,7 +210,7 @@ where
pub fn dial<I>(self, address: Multiaddr, remaining: I, handler: THandler)
-> Result<
(ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
ConnectionLimit
DialError
>
where
I: IntoIterator<Item = Multiaddr>,
Expand All @@ -219,7 +219,9 @@ where
Peer::Connected(p) => (p.peer_id, p.network),
Peer::Dialing(p) => (p.peer_id, p.network),
Peer::Disconnected(p) => (p.peer_id, p.network),
Peer::Local => return Err(ConnectionLimit { current: 0, limit: 0 })
Peer::Local => return Err(DialError::ConnectionLimit(ConnectionLimit {
current: 0, limit: 0
}))
};

let id = network.dial_peer(DialingOpts {
Expand Down
19 changes: 7 additions & 12 deletions core/src/transport/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,19 +263,14 @@ impl Drop for Listener {

/// If the address is `/memory/n`, returns the value of `n`.
fn parse_memory_addr(a: &Multiaddr) -> Result<u64, ()> {
let mut iter = a.iter();

let port = if let Some(Protocol::Memory(port)) = iter.next() {
port
} else {
return Err(());
};

if iter.next().is_some() {
return Err(());
let mut protocols = a.iter();
match protocols.next() {
Some(Protocol::Memory(port)) => match protocols.next() {
None | Some(Protocol::P2p(_)) => Ok(port),
_ => Err(())
}
_ => Err(())
}

Ok(port)
}

/// A channel represents an established, in-memory, logical connection between two endpoints.
Expand Down
16 changes: 10 additions & 6 deletions core/tests/connection_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use libp2p_core::multiaddr::{multiaddr, Multiaddr};
use libp2p_core::{
PeerId,
connection::PendingConnectionError,
network::{NetworkEvent, NetworkConfig, ConnectionLimits},
network::{NetworkEvent, NetworkConfig, ConnectionLimits, DialError},
};
use rand::Rng;
use std::task::Poll;
Expand All @@ -47,12 +47,16 @@ fn max_outgoing() {
.expect("Unexpected connection limit.");
}

let err = network.peer(target.clone())
match network.peer(target.clone())
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.expect_err("Unexpected dialing success.");

assert_eq!(err.current, outgoing_limit);
assert_eq!(err.limit, outgoing_limit);
.expect_err("Unexpected dialing success.")
{
DialError::ConnectionLimit(err) => {
assert_eq!(err.current, outgoing_limit);
assert_eq!(err.limit, outgoing_limit);
}
e => panic!("Unexpected error: {:?}", e),
}

let info = network.info();
assert_eq!(info.num_peers(), 0);
Expand Down
17 changes: 12 additions & 5 deletions core/tests/network_dial_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use libp2p_core::multiaddr::multiaddr;
use libp2p_core::{
PeerId,
connection::PendingConnectionError,
multiaddr::Protocol,
network::{NetworkEvent, NetworkConfig},
};
use rand::seq::SliceRandom;
Expand Down Expand Up @@ -70,7 +71,7 @@ fn deny_incoming_connec() {
error: PendingConnectionError::Transport(_)
}) => {
assert_eq!(&peer_id, swarm1.local_peer_id());
assert_eq!(multiaddr, address);
assert_eq!(multiaddr, address.clone().with(Protocol::P2p(peer_id.into())));
return Poll::Ready(Ok(()));
},
Poll::Ready(_) => unreachable!(),
Expand Down Expand Up @@ -162,21 +163,27 @@ fn dial_self_by_id() {
fn multiple_addresses_err() {
// Tries dialing multiple addresses, and makes sure there's one dialing error per address.

let target = PeerId::random();

let mut swarm = test_network(NetworkConfig::default());

let mut addresses = Vec::new();
for _ in 0 .. 3 {
addresses.push(multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())]);
addresses.push(multiaddr![
Ip4([0, 0, 0, 0]),
Tcp(rand::random::<u16>())
]);
}
for _ in 0 .. 5 {
addresses.push(multiaddr![Udp(rand::random::<u16>())]);
addresses.push(multiaddr![
Udp(rand::random::<u16>())
]);
}
addresses.shuffle(&mut rand::thread_rng());

let first = addresses[0].clone();
let rest = (&addresses[1..]).iter().cloned();

let target = PeerId::random();
swarm.peer(target.clone())
.dial(first, rest, TestHandler())
.unwrap();
Expand All @@ -191,7 +198,7 @@ fn multiple_addresses_err() {
error: PendingConnectionError::Transport(_)
}) => {
assert_eq!(peer_id, target);
let expected = addresses.remove(0);
let expected = addresses.remove(0).with(Protocol::P2p(target.clone().into()));
assert_eq!(multiaddr, expected);
if addresses.is_empty() {
assert_eq!(attempts_remaining, 0);
Expand Down
38 changes: 16 additions & 22 deletions examples/ipfs-kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

use async_std::task;
use libp2p::{
Multiaddr,
Swarm,
PeerId,
identity,
Expand All @@ -38,7 +39,14 @@ use libp2p::kad::{
QueryResult,
};
use libp2p::kad::record::store::MemoryStore;
use std::{env, error::Error, time::Duration};
use std::{env, error::Error, str::FromStr, time::Duration};

const BOOTNODES: [&'static str; 4] = [
"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
"QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt"
];

#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
Expand All @@ -59,28 +67,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
let store = MemoryStore::new(local_peer_id.clone());
let mut behaviour = Kademlia::with_config(local_peer_id.clone(), store, cfg);

// TODO: the /dnsaddr/ scheme is not supported (https://github.com/libp2p/rust-libp2p/issues/967)
/*behaviour.add_address(&"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
behaviour.add_address(&"QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
behaviour.add_address(&"QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());
behaviour.add_address(&"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());*/

// The only address that currently works.
behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse()?, "/ip4/104.131.131.82/tcp/4001".parse()?);

// The following addresses always fail signature verification, possibly due to
// RSA keys with < 2048 bits.
// behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap());
// Add the bootnodes to the local routing table. `libp2p-dns` built
// into the `transport` resolves the `dnsaddr` when Kademlia tries
// to dial these nodes.
let bootaddr = Multiaddr::from_str("/dnsaddr/bootstrap.libp2p.io")?;
for peer in &BOOTNODES {
behaviour.add_address(&PeerId::from_str(peer)?, bootaddr.clone());
}

// The following addresses are permanently unreachable:
// Other(Other(A(Transport(A(Underlying(Os { code: 101, kind: Other, message: "Network is unreachable" }))))))
// behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap());
// behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap());
Swarm::new(transport, behaviour, local_peer_id)
};

Expand Down
10 changes: 10 additions & 0 deletions misc/multiaddr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ impl Multiaddr {

if replaced { Some(address) } else { None }
}

/// Checks whether the given `Multiaddr` is a suffix of this `Multiaddr`.
pub fn ends_with(&self, other: &Multiaddr) -> bool {
let n = self.bytes.len();
let m = other.bytes.len();
if n < m {
return false
}
self.bytes[(n - m) ..] == other.bytes[..]
}
}

impl fmt::Debug for Multiaddr {
Expand Down
12 changes: 12 additions & 0 deletions misc/multiaddr/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ fn push_pop_identity() {
QuickCheck::new().quickcheck(prop as fn(Ma, Proto) -> bool)
}

#[test]
fn ends_with() {
fn prop(Ma(m): Ma) {
let n = m.iter().count();
for i in 0 .. n {
let suffix = m.iter().skip(i).collect::<Multiaddr>();
assert!(m.ends_with(&suffix));
}
}
QuickCheck::new().quickcheck(prop as fn(_))
}


// Arbitrary impls

Expand Down
2 changes: 1 addition & 1 deletion misc/multistream-select/src/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ where
let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;
*this.state = SeqState::SendProtocol { io, protocol }
}
_ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())),
_ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into()))
}
}

Expand Down