From 411a0495c5baa8dd714385a68871118ac98bbb61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 12 Sep 2023 11:29:27 +0100 Subject: [PATCH] feat(upnp): add implementation based on IGD protocol Implements UPnP via the IGD protocol. The usage of IGD is an implementation detail and is planned to be extended to support NATpnp. Resolves: #3903. Pull-Request: #4156. --- CHANGELOG.md | 1 + Cargo.lock | 69 ++++ Cargo.toml | 3 + examples/upnp/Cargo.toml | 11 + examples/upnp/README.md | 23 ++ examples/upnp/src/main.rs | 81 +++++ libp2p/CHANGELOG.md | 4 + libp2p/Cargo.toml | 5 +- libp2p/src/lib.rs | 4 + protocols/upnp/CHANGELOG.md | 3 + protocols/upnp/Cargo.toml | 25 ++ protocols/upnp/src/behaviour.rs | 555 ++++++++++++++++++++++++++++++++ protocols/upnp/src/lib.rs | 37 +++ protocols/upnp/src/tokio.rs | 169 ++++++++++ 14 files changed, 989 insertions(+), 1 deletion(-) create mode 100644 examples/upnp/Cargo.toml create mode 100644 examples/upnp/README.md create mode 100644 examples/upnp/src/main.rs create mode 100644 protocols/upnp/CHANGELOG.md create mode 100644 protocols/upnp/Cargo.toml create mode 100644 protocols/upnp/src/behaviour.rs create mode 100644 protocols/upnp/src/lib.rs create mode 100644 protocols/upnp/src/tokio.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cf9f7c9559..f6b32c35f97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ - [`libp2p-relay` CHANGELOG](protocols/relay/CHANGELOG.md) - [`libp2p-request-response` CHANGELOG](protocols/request-response/CHANGELOG.md) - [`libp2p-rendezvous` CHANGELOG](protocols/rendezvous/CHANGELOG.md) +- [`libp2p-upnp` CHANGELOG](protocols/upnp/CHANGELOG.md) ## Transport Protocols & Upgrades diff --git a/Cargo.lock b/Cargo.lock index a5940fda828..6e1842797a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -443,6 +443,17 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" +[[package]] +name = "attohttpc" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d9a9bf8b79a749ee0b911b91b671cc2b6c670bdbc7e3dfd537576ddc94bb2a2" +dependencies = [ + "http", + "log", + "url", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -2013,6 +2024,25 @@ dependencies = [ "windows", ] +[[package]] +name = "igd-next" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57e065e90a518ab5fedf79aa1e4b784e10f8e484a834f6bda85c42633a2cb7af" +dependencies = [ + "async-trait", + "attohttpc", + "bytes", + "futures", + "http", + "hyper", + "log", + "rand 0.8.5", + "tokio", + "url", + "xmltree", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -2278,6 +2308,7 @@ dependencies = [ "libp2p-tcp", "libp2p-tls", "libp2p-uds", + "libp2p-upnp", "libp2p-wasm-ext", "libp2p-websocket", "libp2p-webtransport-websys", @@ -3029,6 +3060,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "libp2p-upnp" +version = "0.1.0" +dependencies = [ + "futures", + "futures-timer", + "igd-next", + "libp2p-core", + "libp2p-swarm", + "log", + "tokio", + "void", +] + [[package]] name = "libp2p-wasm-ext" version = "0.40.0" @@ -5712,6 +5757,15 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +[[package]] +name = "upnp-example" +version = "0.1.0" +dependencies = [ + "futures", + "libp2p", + "tokio", +] + [[package]] name = "url" version = "2.4.1" @@ -6381,6 +6435,21 @@ dependencies = [ "time", ] +[[package]] +name = "xml-rs" +version = "0.8.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1eee6bf5926be7cf998d7381a9a23d833fd493f6a8034658a9505a4dc4b20444" + +[[package]] +name = "xmltree" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7d8a75eaf6557bb84a65ace8609883db44a29951042ada9b393151532e41fcb" +dependencies = [ + "xml-rs", +] + [[package]] name = "yamux" version = "0.12.0" diff --git a/Cargo.toml b/Cargo.toml index 55258267fec..99df44f569c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "examples/ping-example", "examples/relay-server", "examples/rendezvous", + "examples/upnp", "identity", "interop-tests", "misc/allow-block-list", @@ -40,6 +41,7 @@ members = [ "protocols/relay", "protocols/rendezvous", "protocols/request-response", + "protocols/upnp", "swarm", "swarm-derive", "swarm-test", @@ -90,6 +92,7 @@ libp2p-pnet = { version = "0.23.0", path = "transports/pnet" } libp2p-quic = { version = "0.9.2", path = "transports/quic" } libp2p-relay = { version = "0.16.1", path = "protocols/relay" } libp2p-rendezvous = { version = "0.13.0", path = "protocols/rendezvous" } +libp2p-upnp = { version = "0.1.0", path = "protocols/upnp" } libp2p-request-response = { version = "0.25.1", path = "protocols/request-response" } libp2p-server = { version = "0.12.3", path = "misc/server" } libp2p-swarm = { version = "0.43.4", path = "swarm" } diff --git a/examples/upnp/Cargo.toml b/examples/upnp/Cargo.toml new file mode 100644 index 00000000000..afb8f61d2a2 --- /dev/null +++ b/examples/upnp/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "upnp-example" +version = "0.1.0" +edition = "2021" +publish = false +license = "MIT" + +[dependencies] +tokio = { version = "1", features = [ "rt-multi-thread", "macros"] } +futures = "0.3.28" +libp2p = { path = "../../libp2p", features = ["tokio", "dns", "macros", "noise", "ping", "tcp", "websocket", "yamux", "upnp"] } diff --git a/examples/upnp/README.md b/examples/upnp/README.md new file mode 100644 index 00000000000..48335bfa9fc --- /dev/null +++ b/examples/upnp/README.md @@ -0,0 +1,23 @@ +## Description + +The upnp example showcases how to use the upnp network behaviour to externally open ports on the network gateway. + + +## Usage + +To run the example, follow these steps: + +1. In a terminal window, run the following command: + + ```sh + cargo run + ``` + +2. This command will start the swarm and print the `NewExternalAddr` if the gateway supports `UPnP` or + `GatewayNotFound` if it doesn't. + + +## Conclusion + +The upnp example demonstrates the usage of **libp2p** to externally open a port on the gateway if it +supports [`UPnP`](https://en.wikipedia.org/wiki/Universal_Plug_and_Play). diff --git a/examples/upnp/src/main.rs b/examples/upnp/src/main.rs new file mode 100644 index 00000000000..b4350dc82ad --- /dev/null +++ b/examples/upnp/src/main.rs @@ -0,0 +1,81 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#![doc = include_str!("../README.md")] + +use futures::prelude::*; +use libp2p::core::upgrade::Version; +use libp2p::{ + identity, noise, + swarm::{SwarmBuilder, SwarmEvent}, + tcp, upnp, yamux, Multiaddr, PeerId, Transport, +}; +use std::error::Error; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let local_key = identity::Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + println!("Local peer id: {local_peer_id:?}"); + + let transport = tcp::tokio::Transport::default() + .upgrade(Version::V1Lazy) + .authenticate(noise::Config::new(&local_key)?) + .multiplex(yamux::Config::default()) + .boxed(); + + let mut swarm = SwarmBuilder::with_tokio_executor( + transport, + upnp::tokio::Behaviour::default(), + local_peer_id, + ) + .build(); + + // Tell the swarm to listen on all interfaces and a random, OS-assigned + // port. + swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; + + // Dial the peer identified by the multi-address given as the second + // command-line argument, if any. + if let Some(addr) = std::env::args().nth(1) { + let remote: Multiaddr = addr.parse()?; + swarm.dial(remote)?; + println!("Dialed {addr}") + } + + loop { + match swarm.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {address:?}"), + SwarmEvent::Behaviour(upnp::Event::NewExternalAddr(addr)) => { + println!("New external address: {addr}"); + } + SwarmEvent::Behaviour(upnp::Event::GatewayNotFound) => { + println!("Gateway does not support UPnP"); + break; + } + SwarmEvent::Behaviour(upnp::Event::NonRoutableGateway) => { + println!("Gateway is not exposed directly to the public Internet, i.e. it itself has a private IP address."); + break; + } + _ => {} + } + } + Ok(()) +} diff --git a/libp2p/CHANGELOG.md b/libp2p/CHANGELOG.md index 414c11058cb..9ae0d1c28aa 100644 --- a/libp2p/CHANGELOG.md +++ b/libp2p/CHANGELOG.md @@ -10,10 +10,14 @@ - Add `json` feature which exposes `request_response::json`. See [PR 4188]. +- Add support for UPnP via the IGD protocol. + See [PR 4156]. + - Add `libp2p-memory-connection-limits` providing memory usage based connection limit configurations. See [PR 4281]. [PR 4188]: https://github.com/libp2p/rust-libp2p/pull/4188 +[PR 4156]: https://github.com/libp2p/rust-libp2p/pull/4156 [PR 4217]: https://github.com/libp2p/rust-libp2p/pull/4217 [PR 4281]: https://github.com/libp2p/rust-libp2p/pull/4281 diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 5cb8741afa0..6b11d7deec4 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -50,6 +50,7 @@ full = [ "websocket", "webtransport-websys", "yamux", + "upnp" ] async-std = ["libp2p-swarm/async-std", "libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std", "libp2p-quic?/async-std"] @@ -82,7 +83,7 @@ secp256k1 = ["libp2p-identity/secp256k1"] serde = ["libp2p-core/serde", "libp2p-kad?/serde", "libp2p-gossipsub?/serde"] tcp = ["dep:libp2p-tcp"] tls = ["dep:libp2p-tls"] -tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio"] +tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio", "libp2p-upnp?/tokio"] uds = ["dep:libp2p-uds"] wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js", "libp2p-swarm/wasm-bindgen", "libp2p-gossipsub?/wasm-bindgen"] wasm-ext = ["dep:libp2p-wasm-ext"] @@ -90,6 +91,7 @@ wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext?/websocket"] websocket = ["dep:libp2p-websocket"] webtransport-websys = ["dep:libp2p-webtransport-websys"] yamux = ["dep:libp2p-yamux"] +upnp = ["dep:libp2p-upnp"] [dependencies] bytes = "1" @@ -133,6 +135,7 @@ libp2p-quic = { workspace = true, optional = true } libp2p-tcp = { workspace = true, optional = true } libp2p-tls = { workspace = true, optional = true } libp2p-uds = { workspace = true, optional = true } +libp2p-upnp = { workspace = true, optional = true } libp2p-websocket = { workspace = true, optional = true } [dev-dependencies] diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 349427ffdfe..624ff897c17 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -127,6 +127,10 @@ pub use libp2p_tls as tls; #[cfg(not(target_arch = "wasm32"))] #[doc(inline)] pub use libp2p_uds as uds; +#[cfg(feature = "upnp")] +#[cfg(not(target_arch = "wasm32"))] +#[doc(inline)] +pub use libp2p_upnp as upnp; #[cfg(feature = "wasm-ext")] #[doc(inline)] pub use libp2p_wasm_ext as wasm_ext; diff --git a/protocols/upnp/CHANGELOG.md b/protocols/upnp/CHANGELOG.md new file mode 100644 index 00000000000..60d76c2412c --- /dev/null +++ b/protocols/upnp/CHANGELOG.md @@ -0,0 +1,3 @@ +## 0.1.0 - unreleased + +- Initial version diff --git a/protocols/upnp/Cargo.toml b/protocols/upnp/Cargo.toml new file mode 100644 index 00000000000..ab20496aa4b --- /dev/null +++ b/protocols/upnp/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "libp2p-upnp" +edition = "2021" +rust-version = "1.60.0" +description = "UPnP support for libp2p transports" +version = "0.1.0" +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] +publish = false + +[dependencies] +futures = "0.3.28" +futures-timer = "3.0.2" +igd-next = "0.14.2" +libp2p-core = { workspace = true } +libp2p-swarm = { workspace = true } +log = "0.4.19" +void = "1.0.2" +tokio = { version = "1.29", default-features = false, features = ["rt"], optional = true } + +[features] +tokio = ["igd-next/aio_tokio", "dep:tokio"] + diff --git a/protocols/upnp/src/behaviour.rs b/protocols/upnp/src/behaviour.rs new file mode 100644 index 00000000000..f582e96e1e7 --- /dev/null +++ b/protocols/upnp/src/behaviour.rs @@ -0,0 +1,555 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +use std::{ + borrow::Borrow, + collections::{HashMap, VecDeque}, + error::Error, + hash::{Hash, Hasher}, + net::{self, IpAddr, SocketAddr, SocketAddrV4}, + ops::{Deref, DerefMut}, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use crate::tokio::{is_addr_global, Gateway}; +use futures::{channel::oneshot, Future, StreamExt}; +use futures_timer::Delay; +use igd_next::PortMappingProtocol; +use libp2p_core::{multiaddr, transport::ListenerId, Endpoint, Multiaddr}; +use libp2p_swarm::{ + derive_prelude::PeerId, dummy, ConnectionDenied, ConnectionId, ExpiredListenAddr, FromSwarm, + NetworkBehaviour, NewListenAddr, PollParameters, ToSwarm, +}; + +/// The duration in seconds of a port mapping on the gateway. +const MAPPING_DURATION: u32 = 3600; + +/// Renew the Mapping every half of `MAPPING_DURATION` to avoid the port being unmapped. +const MAPPING_TIMEOUT: u64 = MAPPING_DURATION as u64 / 2; + +/// A [`Gateway`] Request. +#[derive(Debug)] +pub(crate) enum GatewayRequest { + AddMapping { mapping: Mapping, duration: u32 }, + RemoveMapping(Mapping), +} + +/// A [`Gateway`] event. +#[derive(Debug)] +pub(crate) enum GatewayEvent { + /// Port was successfully mapped. + Mapped(Mapping), + /// There was a failure mapping port. + MapFailure(Mapping, Box), + /// Port was successfully removed. + Removed(Mapping), + /// There was a failure removing the mapped port. + RemovalFailure(Mapping, Box), +} + +/// Mapping of a Protocol and Port on the gateway. +#[derive(Debug, Clone)] +pub(crate) struct Mapping { + pub(crate) listener_id: ListenerId, + pub(crate) protocol: PortMappingProtocol, + pub(crate) multiaddr: Multiaddr, + pub(crate) internal_addr: SocketAddr, +} + +impl Mapping { + /// Given the input gateway address, calculate the + /// open external `Multiaddr`. + fn external_addr(&self, gateway_addr: IpAddr) -> Multiaddr { + let addr = match gateway_addr { + net::IpAddr::V4(ip) => multiaddr::Protocol::Ip4(ip), + net::IpAddr::V6(ip) => multiaddr::Protocol::Ip6(ip), + }; + self.multiaddr + .replace(0, |_| Some(addr)) + .expect("multiaddr should be valid") + } +} + +impl Hash for Mapping { + fn hash(&self, state: &mut H) { + self.listener_id.hash(state); + } +} + +impl PartialEq for Mapping { + fn eq(&self, other: &Self) -> bool { + self.listener_id == other.listener_id + } +} + +impl Eq for Mapping {} + +impl Borrow for Mapping { + fn borrow(&self) -> &ListenerId { + &self.listener_id + } +} + +/// Current state of a [`Mapping`]. +#[derive(Debug)] +enum MappingState { + /// Port mapping is inactive, will be requested or re-requested on the next iteration. + Inactive, + /// Port mapping/removal has been requested on the gateway. + Pending, + /// Port mapping is active with the inner timeout. + Active(Delay), + /// Port mapping failed, we will try again. + Failed, +} + +/// Current state of the UPnP [`Gateway`]. +enum GatewayState { + Searching(oneshot::Receiver>>), + Available(Gateway), + GatewayNotFound, + NonRoutableGateway(IpAddr), +} + +/// The event produced by `Behaviour`. +#[derive(Debug)] +pub enum Event { + /// The multiaddress is reachable externally. + NewExternalAddr(Multiaddr), + /// The renewal of the multiaddress on the gateway failed. + ExpiredExternalAddr(Multiaddr), + /// The IGD gateway was not found. + GatewayNotFound, + /// The Gateway is not exposed directly to the public network. + NonRoutableGateway, +} + +/// A list of port mappings and its state. +#[derive(Debug, Default)] +struct MappingList(HashMap); + +impl Deref for MappingList { + type Target = HashMap; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for MappingList { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl MappingList { + /// Queue for renewal the current mapped ports on the `Gateway` that are expiring, + /// and try to activate the inactive. + fn renew(&mut self, gateway: &mut Gateway, cx: &mut Context<'_>) { + for (mapping, state) in self.iter_mut() { + match state { + MappingState::Inactive | MappingState::Failed => { + let duration = MAPPING_DURATION; + if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping { + mapping: mapping.clone(), + duration, + }) { + log::debug!( + "could not request port mapping for {} on the gateway: {}", + mapping.multiaddr, + err + ); + } + *state = MappingState::Pending; + } + MappingState::Active(timeout) => { + if Pin::new(timeout).poll(cx).is_ready() { + let duration = MAPPING_DURATION; + if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping { + mapping: mapping.clone(), + duration, + }) { + log::debug!( + "could not request port mapping for {} on the gateway: {}", + mapping.multiaddr, + err + ); + } + } + } + MappingState::Pending => {} + } + } + } +} + +/// A [`NetworkBehaviour`] for UPnP port mapping. Automatically tries to map the external port +/// to an internal address on the gateway on a [`FromSwarm::NewListenAddr`]. +pub struct Behaviour { + /// UPnP interface state. + state: GatewayState, + + /// List of port mappings. + mappings: MappingList, + + /// Pending behaviour events to be emitted. + pending_events: VecDeque, +} + +impl Default for Behaviour { + fn default() -> Self { + Self { + state: GatewayState::Searching(crate::tokio::search_gateway()), + mappings: Default::default(), + pending_events: VecDeque::new(), + } + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = dummy::ConnectionHandler; + + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(dummy::ConnectionHandler) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _addr: &Multiaddr, + _role_override: Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(dummy::ConnectionHandler) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::NewListenAddr(NewListenAddr { + listener_id, + addr: multiaddr, + }) => { + let (addr, protocol) = match multiaddr_to_socketaddr_protocol(multiaddr.clone()) { + Ok(addr_port) => addr_port, + Err(()) => { + log::debug!("multiaddress not supported for UPnP {multiaddr}"); + return; + } + }; + + if let Some((mapping, _state)) = self + .mappings + .iter() + .find(|(mapping, _state)| mapping.internal_addr.port() == addr.port()) + { + log::debug!("port from multiaddress {multiaddr} is already being mapped to another multiaddr: {}", mapping.multiaddr); + return; + } + + match &mut self.state { + GatewayState::Searching(_) => { + // As the gateway is not yet available we add the mapping with `MappingState::Inactive` + // so that when and if it becomes available we map it. + self.mappings.insert( + Mapping { + listener_id, + protocol, + internal_addr: addr, + multiaddr: multiaddr.clone(), + }, + MappingState::Inactive, + ); + } + GatewayState::Available(ref mut gateway) => { + let mapping = Mapping { + listener_id, + protocol, + internal_addr: addr, + multiaddr: multiaddr.clone(), + }; + + let duration = MAPPING_DURATION; + if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping { + mapping: mapping.clone(), + duration, + }) { + log::debug!( + "could not request port mapping for {} on the gateway: {}", + mapping.multiaddr, + err + ); + } + + self.mappings.insert(mapping, MappingState::Pending); + } + GatewayState::GatewayNotFound => { + log::debug!( + "network gateway not found, UPnP port mapping of {multiaddr} discarded" + ); + } + GatewayState::NonRoutableGateway(addr) => { + log::debug!( + "the network gateway is not exposed to the public network, \ + it's ip is {addr}. UPnP port mapping of {multiaddr} discarded" + ); + } + }; + } + FromSwarm::ExpiredListenAddr(ExpiredListenAddr { + listener_id, + addr: _addr, + }) => { + if let GatewayState::Available(ref mut gateway) = &mut self.state { + if let Some((mapping, _state)) = self.mappings.remove_entry(&listener_id) { + if let Err(err) = gateway + .sender + .try_send(GatewayRequest::RemoveMapping(mapping.clone())) + { + log::debug!( + "could not request port removal for {} on the gateway: {}", + mapping.multiaddr, + err + ); + } + self.mappings.insert(mapping, MappingState::Pending); + } + } + } + FromSwarm::ConnectionEstablished(_) + | FromSwarm::ConnectionClosed(_) + | FromSwarm::AddressChange(_) + | FromSwarm::DialFailure(_) + | FromSwarm::ListenFailure(_) + | FromSwarm::NewListener(_) + | FromSwarm::ListenerError(_) + | FromSwarm::ListenerClosed(_) + | FromSwarm::NewExternalAddrCandidate(_) + | FromSwarm::ExternalAddrConfirmed(_) + | FromSwarm::ExternalAddrExpired(_) => {} + } + } + + fn on_connection_handler_event( + &mut self, + _peer_id: PeerId, + _connection_id: ConnectionId, + event: libp2p_swarm::THandlerOutEvent, + ) { + void::unreachable(event) + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + _params: &mut impl PollParameters, + ) -> Poll>> { + // If there are pending addresses to be emitted we emit them. + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + + // Loop through the gateway state so that if it changes from `Searching` to `Available` + // we poll the pending mapping requests. + loop { + match self.state { + GatewayState::Searching(ref mut fut) => match Pin::new(fut).poll(cx) { + Poll::Ready(result) => { + match result.expect("sender shouldn't have been dropped") { + Ok(gateway) => { + if !is_addr_global(gateway.external_addr) { + self.state = + GatewayState::NonRoutableGateway(gateway.external_addr); + log::debug!( + "the gateway is not routable, its address is {}", + gateway.external_addr + ); + return Poll::Ready(ToSwarm::GenerateEvent( + Event::NonRoutableGateway, + )); + } + self.state = GatewayState::Available(gateway); + } + Err(err) => { + log::debug!("could not find gateway: {err}"); + self.state = GatewayState::GatewayNotFound; + return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound)); + } + } + } + Poll::Pending => return Poll::Pending, + }, + GatewayState::Available(ref mut gateway) => { + // Poll pending mapping requests. + if let Poll::Ready(Some(result)) = gateway.receiver.poll_next_unpin(cx) { + match result { + GatewayEvent::Mapped(mapping) => { + let new_state = MappingState::Active(Delay::new( + Duration::from_secs(MAPPING_TIMEOUT), + )); + + match self + .mappings + .insert(mapping.clone(), new_state) + .expect("mapping should exist") + { + MappingState::Pending => { + let external_multiaddr = + mapping.external_addr(gateway.external_addr); + self.pending_events.push_back(Event::NewExternalAddr( + external_multiaddr.clone(), + )); + log::debug!( + "succcessfully mapped UPnP {} for {} protocol", + mapping.internal_addr, + mapping.protocol + ); + return Poll::Ready(ToSwarm::ExternalAddrConfirmed( + external_multiaddr, + )); + } + MappingState::Active(_) => { + log::debug!( + "succcessfully renewed UPnP mapping {} for {} protocol", + mapping.internal_addr, + mapping.protocol + ); + } + _ => unreachable!(), + } + } + GatewayEvent::MapFailure(mapping, err) => { + match self + .mappings + .insert(mapping.clone(), MappingState::Failed) + .expect("mapping should exist") + { + MappingState::Active(_) => { + log::debug!( + "failed to remap UPnP mapped {} for {} protocol: {err}", + mapping.internal_addr, + mapping.protocol + ); + let external_multiaddr = + mapping.external_addr(gateway.external_addr); + self.pending_events.push_back(Event::ExpiredExternalAddr( + external_multiaddr.clone(), + )); + return Poll::Ready(ToSwarm::ExternalAddrExpired( + external_multiaddr, + )); + } + MappingState::Pending => { + log::debug!( + "failed to map upnp mapped {} for {} protocol: {err}", + mapping.internal_addr, + mapping.protocol + ); + } + _ => { + unreachable!() + } + } + } + GatewayEvent::Removed(mapping) => { + log::debug!( + "succcessfully removed UPnP mapping {} for {} protocol", + mapping.internal_addr, + mapping.protocol + ); + self.mappings + .remove(&mapping) + .expect("mapping should exist"); + } + GatewayEvent::RemovalFailure(mapping, err) => { + log::debug!( + "could not remove UPnP mapping {} for {} protocol: {err}", + mapping.internal_addr, + mapping.protocol + ); + if let Err(err) = gateway + .sender + .try_send(GatewayRequest::RemoveMapping(mapping.clone())) + { + log::debug!( + "could not request port removal for {} on the gateway: {}", + mapping.multiaddr, + err + ); + } + } + } + } + + // Renew expired and request inactive mappings. + self.mappings.renew(gateway, cx); + return Poll::Pending; + } + GatewayState::GatewayNotFound => { + return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound)); + } + GatewayState::NonRoutableGateway(_) => { + return Poll::Ready(ToSwarm::GenerateEvent(Event::NonRoutableGateway)); + } + } + } + } +} + +/// Extracts a [`SocketAddrV4`] and [`PortMappingProtocol`] from a given [`Multiaddr`]. +/// +/// Fails if the given [`Multiaddr`] does not begin with an IP +/// protocol encapsulating a TCP or UDP port. +fn multiaddr_to_socketaddr_protocol( + addr: Multiaddr, +) -> Result<(SocketAddr, PortMappingProtocol), ()> { + let mut iter = addr.into_iter(); + match iter.next() { + // Idg only supports Ipv4. + Some(multiaddr::Protocol::Ip4(ipv4)) if ipv4.is_private() => match iter.next() { + Some(multiaddr::Protocol::Tcp(port)) => { + return Ok(( + SocketAddr::V4(SocketAddrV4::new(ipv4, port)), + PortMappingProtocol::TCP, + )); + } + Some(multiaddr::Protocol::Udp(port)) => { + return Ok(( + SocketAddr::V4(SocketAddrV4::new(ipv4, port)), + PortMappingProtocol::TCP, + )); + } + _ => {} + }, + _ => {} + } + Err(()) +} diff --git a/protocols/upnp/src/lib.rs b/protocols/upnp/src/lib.rs new file mode 100644 index 00000000000..8a74d7e8f63 --- /dev/null +++ b/protocols/upnp/src/lib.rs @@ -0,0 +1,37 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of UPnP port mapping for libp2p. +//! +//! This crate provides a `tokio::Behaviour` which +//! implements the [`libp2p_swarm::NetworkBehaviour`] trait. +//! This struct will automatically try to map the ports externally to internal +//! addresses on the gateway. +//! + +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +#[cfg(feature = "tokio")] +mod behaviour; +#[cfg(feature = "tokio")] +pub mod tokio; + +#[cfg(feature = "tokio")] +pub use behaviour::Event; diff --git a/protocols/upnp/src/tokio.rs b/protocols/upnp/src/tokio.rs new file mode 100644 index 00000000000..c6a40182b33 --- /dev/null +++ b/protocols/upnp/src/tokio.rs @@ -0,0 +1,169 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{error::Error, net::IpAddr}; + +use crate::behaviour::{GatewayEvent, GatewayRequest}; +use futures::{ + channel::{mpsc, oneshot}, + SinkExt, StreamExt, +}; +use igd_next::SearchOptions; + +pub use crate::behaviour::Behaviour; + +//TODO: remove when `IpAddr::is_global` stabilizes. +pub(crate) fn is_addr_global(addr: IpAddr) -> bool { + match addr { + IpAddr::V4(ip) => { + !(ip.octets()[0] == 0 // "This network" + || ip.is_private() + // code for Ipv4::is_shared() + || (ip.octets()[0] == 100 && (ip.octets()[1] & 0b1100_0000 == 0b0100_0000)) + || ip.is_loopback() + || ip.is_link_local() + // addresses reserved for future protocols (`192.0.0.0/24`) + ||(ip.octets()[0] == 192 && ip.octets()[1] == 0 && ip.octets()[2] == 0) + || ip.is_documentation() + // code for Ipv4::is_benchmarking() + || (ip.octets()[0] == 198 && (ip.octets()[1] & 0xfe) == 18) + // code for Ipv4::is_reserved() + || (ip.octets()[0] & 240 == 240 && !ip.is_broadcast()) + || ip.is_broadcast()) + } + IpAddr::V6(ip) => { + !(ip.is_unspecified() + || ip.is_loopback() + // IPv4-mapped Address (`::ffff:0:0/96`) + || matches!(ip.segments(), [0, 0, 0, 0, 0, 0xffff, _, _]) + // IPv4-IPv6 Translat. (`64:ff9b:1::/48`) + || matches!(ip.segments(), [0x64, 0xff9b, 1, _, _, _, _, _]) + // Discard-Only Address Block (`100::/64`) + || matches!(ip.segments(), [0x100, 0, 0, 0, _, _, _, _]) + // IETF Protocol Assignments (`2001::/23`) + || (matches!(ip.segments(), [0x2001, b, _, _, _, _, _, _] if b < 0x200) + && !( + // Port Control Protocol Anycast (`2001:1::1`) + u128::from_be_bytes(ip.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0001 + // Traversal Using Relays around NAT Anycast (`2001:1::2`) + || u128::from_be_bytes(ip.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0002 + // AMT (`2001:3::/32`) + || matches!(ip.segments(), [0x2001, 3, _, _, _, _, _, _]) + // AS112-v6 (`2001:4:112::/48`) + || matches!(ip.segments(), [0x2001, 4, 0x112, _, _, _, _, _]) + // ORCHIDv2 (`2001:20::/28`) + || matches!(ip.segments(), [0x2001, b, _, _, _, _, _, _] if (0x20..=0x2F).contains(&b)) + )) + // code for Ipv4::is_documentation() + || (ip.segments()[0] == 0x2001) && (ip.segments()[1] == 0xdb8) + // code for Ipv4::is_unique_local() + || (ip.segments()[0] & 0xfe00) == 0xfc00 + // code for Ipv4::is_unicast_link_local() + || (ip.segments()[0] & 0xffc0) == 0xfe80) + } + } +} + +/// Interface that interacts with the inner gateway by messages, +/// `GatewayRequest`s and `GatewayEvent`s. +#[derive(Debug)] +pub(crate) struct Gateway { + pub(crate) sender: mpsc::Sender, + pub(crate) receiver: mpsc::Receiver, + pub(crate) external_addr: IpAddr, +} + +pub(crate) fn search_gateway() -> oneshot::Receiver>> { + let (search_result_sender, search_result_receiver) = oneshot::channel(); + + let (events_sender, mut task_receiver) = mpsc::channel(10); + let (mut task_sender, events_queue) = mpsc::channel(0); + + tokio::spawn(async move { + let gateway = match igd_next::aio::tokio::search_gateway(SearchOptions::default()).await { + Ok(gateway) => gateway, + Err(err) => { + search_result_sender + .send(Err(err.into())) + .expect("receiver shouldn't have been dropped"); + return; + } + }; + + let external_addr = match gateway.get_external_ip().await { + Ok(addr) => addr, + Err(err) => { + search_result_sender + .send(Err(err.into())) + .expect("receiver shouldn't have been dropped"); + return; + } + }; + + search_result_sender + .send(Ok(Gateway { + sender: events_sender, + receiver: events_queue, + external_addr, + })) + .expect("receiver shouldn't have been dropped"); + + loop { + // The task sender has dropped so we can return. + let Some(req) = task_receiver.next().await else { + return; + }; + let event = match req { + GatewayRequest::AddMapping { mapping, duration } => { + let gateway = gateway.clone(); + match gateway + .add_port( + mapping.protocol, + mapping.internal_addr.port(), + mapping.internal_addr, + duration, + "rust-libp2p mapping", + ) + .await + { + Ok(()) => GatewayEvent::Mapped(mapping), + Err(err) => GatewayEvent::MapFailure(mapping, err.into()), + } + } + GatewayRequest::RemoveMapping(mapping) => { + let gateway = gateway.clone(); + match gateway + .remove_port(mapping.protocol, mapping.internal_addr.port()) + .await + { + Ok(()) => GatewayEvent::Removed(mapping), + Err(err) => GatewayEvent::RemovalFailure(mapping, err.into()), + } + } + }; + task_sender + .send(event) + .await + .expect("receiver should be available"); + } + }); + + search_result_receiver +}