diff --git a/CHANGELOG.md b/CHANGELOG.md index 95e41bb020c..d28138e1e7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,10 @@ # `libp2p` facade crate +# 0.49.0 - [unreleased] + +- Update to [`libp2p-tcp` `v0.37.0`](transports/tcp/CHANGELOG.md#0370). + # 0.48.0 - Update to [`libp2p-core` `v0.36.0`](core/CHANGELOG.md#0360). diff --git a/Cargo.toml b/Cargo.toml index 62b8df404f8..89e63a4e5cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p" edition = "2021" rust-version = "1.60.0" description = "Peer-to-peer networking library" -version = "0.48.0" +version = "0.49.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -110,7 +110,7 @@ smallvec = "1.6.1" libp2p-deflate = { version = "0.36.0", path = "transports/deflate", optional = true } libp2p-dns = { version = "0.36.0", path = "transports/dns", optional = true, default-features = false } libp2p-mdns = { version = "0.40.0", path = "protocols/mdns", optional = true, default-features = false } -libp2p-tcp = { version = "0.36.0", path = "transports/tcp", default-features = false, optional = true } +libp2p-tcp = { version = "0.37.0", path = "transports/tcp", default-features = false, optional = true } libp2p-websocket = { version = "0.38.0", path = "transports/websocket", optional = true } [target.'cfg(not(target_os = "unknown"))'.dependencies] diff --git a/transports/tcp/CHANGELOG.md b/transports/tcp/CHANGELOG.md index 42cdd64acec..ff34ae49407 100644 --- a/transports/tcp/CHANGELOG.md +++ b/transports/tcp/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.37.0 - [unreleased] + +- Update to `if-watch` `v2.0.0`. Simplify `IfWatcher` integration. + Use `if_watch::IfWatcher` for all runtimes. See [PR 2813]. + +[PR 2813]: https://github.com/libp2p/rust-libp2p/pull/2813 + # 0.36.0 - Update to `libp2p-core` `v0.36.0`. diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index d4577c74252..948d9507f0a 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-tcp" edition = "2021" rust-version = "1.56.1" description = "TCP/IP transport protocol for libp2p" -version = "0.36.0" +version = "0.37.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,9 +14,7 @@ categories = ["network-programming", "asynchronous"] async-io-crate = { package = "async-io", version = "1.2.0", optional = true } futures = "0.3.8" futures-timer = "3.0" -if-watch = { version = "1.1.1", optional = true } -if-addrs = { version = "0.7.0", optional = true } -ipnet = "2.0.0" +if-watch = "2.0.0" libc = "0.2.80" libp2p-core = { version = "0.36.0", path = "../../core", default-features = false } log = "0.4.11" @@ -25,10 +23,10 @@ tokio-crate = { package = "tokio", version = "1.19.0", default-features = false, [features] default = ["async-io"] -tokio = ["tokio-crate", "if-addrs"] -async-io = ["async-io-crate", "if-watch"] +tokio = ["tokio-crate"] +async-io = ["async-io-crate"] [dev-dependencies] async-std = { version = "1.6.5", features = ["attributes"] } -tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net", "rt"] } +tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net", "rt", "macros"] } env_logger = "0.9.0" diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 981c896bcb5..f7b897c0d47 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -28,6 +28,7 @@ mod provider; +use if_watch::{IfEvent, IfWatcher}; #[cfg(feature = "async-io")] pub use provider::async_io; @@ -43,9 +44,8 @@ pub use provider::tokio; pub type TokioTcpTransport = GenTcpTransport; use futures::{ - future::{self, BoxFuture, Ready}, + future::{self, Ready}, prelude::*, - ready, }; use futures_timer::Delay; use libp2p_core::{ @@ -64,7 +64,7 @@ use std::{ time::Duration, }; -use provider::{IfEvent, Provider}; +use provider::{Incoming, Provider}; /// The configuration for a TCP/IP transport capability for libp2p. #[derive(Clone, Debug)] @@ -243,6 +243,9 @@ impl GenTcpConfig { /// # use libp2p_core::transport::{ListenerId, TransportEvent}; /// # use libp2p_core::{Multiaddr, Transport}; /// # use std::pin::Pin; + /// # #[cfg(not(feature = "async-io"))] + /// # fn main() {} + /// # /// #[cfg(feature = "async-io")] /// #[async_std::main] /// async fn main() -> std::io::Result<()> { @@ -368,7 +371,25 @@ where socket.bind(&socket_addr.into())?; socket.listen(self.config.backlog as _)?; socket.set_nonblocking(true)?; - TcpListenStream::::new(id, socket.into(), self.port_reuse.clone()) + let listener: TcpListener = socket.into(); + let local_addr = listener.local_addr()?; + + if local_addr.ip().is_unspecified() { + return TcpListenStream::::new( + id, + listener, + Some(IfWatcher::new()?), + self.port_reuse.clone(), + ); + } + + self.port_reuse.register(local_addr.ip(), local_addr.port()); + let listen_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port()); + self.pending_events.push_back(TransportEvent::NewAddress { + listener_id: id, + listen_addr, + }); + TcpListenStream::::new(id, listener, None, self.port_reuse.clone()) } } @@ -398,7 +419,6 @@ impl Transport for GenTcpTransport where T: Provider + Send + 'static, T::Listener: Unpin, - T::IfWatcher: Unpin, T::Stream: Unpin, { type Output = T::Stream; @@ -605,25 +625,6 @@ pub enum TcpListenerEvent { Error(io::Error), } -enum IfWatch { - Pending(BoxFuture<'static, io::Result>), - Ready(TIfWatcher), -} - -/// The listening addresses of a [`TcpListenStream`]. -enum InAddr { - /// The stream accepts connections on a single interface. - One { - addr: IpAddr, - out: Option, - }, - /// The stream accepts connections on all interfaces. - Any { - addrs: HashSet, - if_watch: IfWatch, - }, -} - /// A stream of incoming connections on one or more interfaces. pub struct TcpListenStream where @@ -637,12 +638,12 @@ where listen_addr: SocketAddr, /// The async listening socket for incoming connections. listener: T::Listener, - /// The IP addresses of network interfaces on which the listening socket - /// is accepting connections. + /// Watcher for network interface changes. + /// Reports [`IfEvent`]s for new / deleted ip-addresses when interfaces + /// become or stop being available. /// - /// If the listen socket listens on all interfaces, these may change over - /// time as interfaces become available or unavailable. - in_addr: InAddr, + /// `None` if the socket is only listening on a single interface. + if_watcher: Option, /// The port reuse configuration for outgoing connections. /// /// If enabled, all IP addresses on which this listening stream @@ -666,27 +667,10 @@ where fn new( listener_id: ListenerId, listener: TcpListener, + if_watcher: Option, port_reuse: PortReuse, ) -> io::Result { let listen_addr = listener.local_addr()?; - - let in_addr = if match &listen_addr { - SocketAddr::V4(a) => a.ip().is_unspecified(), - SocketAddr::V6(a) => a.ip().is_unspecified(), - } { - // The `addrs` are populated via `if_watch` when the - // `TcpListenStream` is polled. - InAddr::Any { - addrs: HashSet::new(), - if_watch: IfWatch::Pending(T::if_watcher()), - } - } else { - InAddr::One { - out: Some(ip_to_multiaddr(listen_addr.ip(), listen_addr.port())), - addr: listen_addr.ip(), - } - }; - let listener = T::new_listener(listener)?; Ok(TcpListenStream { @@ -694,7 +678,7 @@ where listener, listener_id, listen_addr, - in_addr, + if_watcher, pause: None, sleep_on_error: Duration::from_millis(100), }) @@ -707,15 +691,16 @@ where /// /// Has no effect if port reuse is disabled. fn disable_port_reuse(&mut self) { - match &self.in_addr { - InAddr::One { addr, .. } => { - self.port_reuse.unregister(*addr, self.listen_addr.port()); - } - InAddr::Any { addrs, .. } => { - for addr in addrs { - self.port_reuse.unregister(*addr, self.listen_addr.port()); + match &self.if_watcher { + Some(if_watcher) => { + for ip_net in if_watcher.iter() { + self.port_reuse + .unregister(ip_net.addr(), self.listen_addr.port()); } } + None => self + .port_reuse + .unregister(self.listen_addr.ip(), self.listen_addr.port()), } } } @@ -734,116 +719,78 @@ where T: Provider, T::Listener: Unpin, T::Stream: Unpin, - T::IfWatcher: Unpin, { type Item = Result, io::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let me = Pin::into_inner(self); - loop { - match &mut me.in_addr { - InAddr::Any { if_watch, addrs } => match if_watch { - // If we listen on all interfaces, wait for `if-watch` to be ready. - IfWatch::Pending(f) => match ready!(Pin::new(f).poll(cx)) { - Ok(w) => { - *if_watch = IfWatch::Ready(w); - continue; - } - Err(err) => { - log::debug! { - "Failed to begin observing interfaces: {:?}. Scheduling retry.", - err - }; - *if_watch = IfWatch::Pending(T::if_watcher()); - me.pause = Some(Delay::new(me.sleep_on_error)); - return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); - } - }, - // Consume all events for up/down interface changes. - IfWatch::Ready(watch) => { - while let Poll::Ready(ev) = T::poll_interfaces(watch, cx) { - match ev { - Ok(IfEvent::Up(inet)) => { - let ip = inet.addr(); - if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.insert(ip) - { - let ma = ip_to_multiaddr(ip, me.listen_addr.port()); - log::debug!("New listen address: {}", ma); - me.port_reuse.register(ip, me.listen_addr.port()); - return Poll::Ready(Some(Ok( - TcpListenerEvent::NewAddress(ma), - ))); - } - } - Ok(IfEvent::Down(inet)) => { - let ip = inet.addr(); - if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.remove(&ip) - { - let ma = ip_to_multiaddr(ip, me.listen_addr.port()); - log::debug!("Expired listen address: {}", ma); - me.port_reuse.unregister(ip, me.listen_addr.port()); - return Poll::Ready(Some(Ok( - TcpListenerEvent::AddressExpired(ma), - ))); - } - } - Err(err) => { - log::debug! { - "Failure polling interfaces: {:?}. Scheduling retry.", - err - }; - me.pause = Some(Delay::new(me.sleep_on_error)); - return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); - } - } - } - } - }, - // If the listener is bound to a single interface, make sure the - // address is registered for port reuse and reported once. - InAddr::One { addr, out } => { - if let Some(multiaddr) = out.take() { - me.port_reuse.register(*addr, me.listen_addr.port()); - return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(multiaddr)))); - } + if let Some(mut pause) = me.pause.take() { + match pause.poll_unpin(cx) { + Poll::Ready(_) => {} + Poll::Pending => { + me.pause = Some(pause); + return Poll::Pending; } } + } - if let Some(mut pause) = me.pause.take() { - match Pin::new(&mut pause).poll(cx) { - Poll::Ready(_) => {} - Poll::Pending => { - me.pause = Some(pause); - return Poll::Pending; + if let Some(if_watcher) = me.if_watcher.as_mut() { + while let Poll::Ready(event) = if_watcher.poll_if_event(cx) { + match event { + Ok(IfEvent::Up(inet)) => { + let ip = inet.addr(); + if me.listen_addr.is_ipv4() == ip.is_ipv4() { + let ma = ip_to_multiaddr(ip, me.listen_addr.port()); + log::debug!("New listen address: {}", ma); + me.port_reuse.register(ip, me.listen_addr.port()); + return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(ma)))); + } + } + Ok(IfEvent::Down(inet)) => { + let ip = inet.addr(); + if me.listen_addr.is_ipv4() == ip.is_ipv4() { + let ma = ip_to_multiaddr(ip, me.listen_addr.port()); + log::debug!("Expired listen address: {}", ma); + me.port_reuse.unregister(ip, me.listen_addr.port()); + return Poll::Ready(Some(Ok(TcpListenerEvent::AddressExpired(ma)))); + } + } + Err(err) => { + me.pause = Some(Delay::new(me.sleep_on_error)); + return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); } } } + } - // Take the pending connection from the backlog. - let incoming = match T::poll_accept(&mut me.listener, cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(incoming)) => incoming, - Poll::Ready(Err(e)) => { - // These errors are non-fatal for the listener stream. - log::error!("error accepting incoming connection: {}", e); - me.pause = Some(Delay::new(me.sleep_on_error)); - return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e)))); - } - }; + // Take the pending connection from the backlog. + match T::poll_accept(&mut me.listener, cx) { + Poll::Ready(Ok(Incoming { + local_addr, + remote_addr, + stream, + })) => { + let local_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port()); + let remote_addr = ip_to_multiaddr(remote_addr.ip(), remote_addr.port()); - let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port()); - let remote_addr = - ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port()); + log::debug!("Incoming connection from {} at {}", remote_addr, local_addr); - log::debug!("Incoming connection from {} at {}", remote_addr, local_addr); + return Poll::Ready(Some(Ok(TcpListenerEvent::Upgrade { + upgrade: future::ok(stream), + local_addr, + remote_addr, + }))); + } + Poll::Ready(Err(e)) => { + // These errors are non-fatal for the listener stream. + me.pause = Some(Delay::new(me.sleep_on_error)); + return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e)))); + } + Poll::Pending => {} + }; - return Poll::Ready(Some(Ok(TcpListenerEvent::Upgrade { - upgrade: future::ok(incoming.stream), - local_addr, - remote_addr, - }))); - } + Poll::Pending } } @@ -991,7 +938,7 @@ mod tests { #[cfg(feature = "tokio")] { let (ready_tx, ready_rx) = mpsc::channel(1); - let listener = listener::(addr.clone(), ready_tx); + let listener = listener::(addr, ready_tx); let dialer = dialer::(ready_rx); let rt = tokio_crate::runtime::Builder::new_current_thread() .enable_io() @@ -1060,7 +1007,7 @@ mod tests { #[cfg(feature = "tokio")] { let (ready_tx, ready_rx) = mpsc::channel(1); - let listener = listener::(addr.clone(), ready_tx); + let listener = listener::(addr, ready_tx); let dialer = dialer::(ready_rx); let rt = tokio_crate::runtime::Builder::new_current_thread() .enable_io() @@ -1168,7 +1115,7 @@ mod tests { let (ready_tx, ready_rx) = mpsc::channel(1); let (port_reuse_tx, port_reuse_rx) = oneshot::channel(); let listener = listener::(addr.clone(), ready_tx, port_reuse_rx); - let dialer = dialer::(addr.clone(), ready_rx, port_reuse_tx); + let dialer = dialer::(addr, ready_rx, port_reuse_tx); let rt = tokio_crate::runtime::Builder::new_current_thread() .enable_io() .build() @@ -1209,10 +1156,7 @@ mod tests { match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await { TransportEvent::NewAddress { listen_addr: addr2, .. - } => { - assert_eq!(addr1, addr2); - return; - } + } => assert_eq!(addr1, addr2), e => panic!("Unexpected transport event: {:?}", e), } } @@ -1229,7 +1173,7 @@ mod tests { #[cfg(feature = "tokio")] { - let listener = listen_twice::(addr.clone()); + let listener = listen_twice::(addr); let rt = tokio_crate::runtime::Builder::new_current_thread() .enable_io() .build() @@ -1267,7 +1211,7 @@ mod tests { .enable_io() .build() .unwrap(); - let new_addr = rt.block_on(listen::(addr.clone())); + let new_addr = rt.block_on(listen::(addr)); assert!(!new_addr.to_string().contains("tcp/0")); } } @@ -1290,7 +1234,7 @@ mod tests { #[cfg(feature = "tokio")] { let mut tcp = TokioTcpTransport::new(GenTcpConfig::new()); - assert!(tcp.listen_on(addr.clone()).is_err()); + assert!(tcp.listen_on(addr).is_err()); } } diff --git a/transports/tcp/src/provider.rs b/transports/tcp/src/provider.rs index 7ebeaa49ee8..a341026e7e6 100644 --- a/transports/tcp/src/provider.rs +++ b/transports/tcp/src/provider.rs @@ -28,18 +28,10 @@ pub mod tokio; use futures::future::BoxFuture; use futures::io::{AsyncRead, AsyncWrite}; -use ipnet::IpNet; use std::net::{SocketAddr, TcpListener, TcpStream}; use std::task::{Context, Poll}; use std::{fmt, io}; -/// An event relating to a change of availability of an address -/// on a network interface. -pub enum IfEvent { - Up(IpNet), - Down(IpNet), -} - /// An incoming connection returned from [`Provider::poll_accept()`]. pub struct Incoming { pub stream: S, @@ -54,12 +46,6 @@ pub trait Provider: Clone + Send + 'static { type Stream: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug; /// The type of TCP listeners obtained from [`Provider::new_listener`]. type Listener: Send + Unpin; - /// The type of network interface observers obtained from [`Provider::if_watcher`]. - type IfWatcher: Send + Unpin; - - /// Creates an instance of [`Self::IfWatcher`] that can be polled for - /// network interface changes via [`Self::poll_interfaces`]. - fn if_watcher() -> BoxFuture<'static, io::Result>; /// Creates a new listener wrapping the given [`TcpListener`] that /// can be polled for incoming connections via [`Self::poll_accept()`]. @@ -77,8 +63,4 @@ pub trait Provider: Clone + Send + 'static { _: &mut Self::Listener, _: &mut Context<'_>, ) -> Poll>>; - - /// Polls a [`Self::IfWatcher`] for network interface changes, ensuring a task wakeup, - /// if necessary. - fn poll_interfaces(_: &mut Self::IfWatcher, _: &mut Context<'_>) -> Poll>; } diff --git a/transports/tcp/src/provider/async_io.rs b/transports/tcp/src/provider/async_io.rs index acbb4fbdcca..fc613d8fe86 100644 --- a/transports/tcp/src/provider/async_io.rs +++ b/transports/tcp/src/provider/async_io.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use super::{IfEvent, Incoming, Provider}; +use super::{Incoming, Provider}; use async_io_crate::Async; use futures::future::{BoxFuture, FutureExt}; @@ -32,11 +32,6 @@ pub enum Tcp {} impl Provider for Tcp { type Stream = Async; type Listener = Async; - type IfWatcher = if_watch::IfWatcher; - - fn if_watcher() -> BoxFuture<'static, io::Result> { - if_watch::IfWatcher::new().boxed() - } fn new_listener(l: net::TcpListener) -> io::Result { Async::new(l) @@ -87,11 +82,4 @@ impl Provider for Tcp { remote_addr, })) } - - fn poll_interfaces(w: &mut Self::IfWatcher, cx: &mut Context<'_>) -> Poll> { - w.poll_unpin(cx).map_ok(|e| match e { - if_watch::IfEvent::Up(a) => IfEvent::Up(a), - if_watch::IfEvent::Down(a) => IfEvent::Down(a), - }) - } } diff --git a/transports/tcp/src/provider/tokio.rs b/transports/tcp/src/provider/tokio.rs index 564eebfa48b..994a12a33c7 100644 --- a/transports/tcp/src/provider/tokio.rs +++ b/transports/tcp/src/provider/tokio.rs @@ -18,45 +18,24 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use super::{IfEvent, Incoming, Provider}; +use super::{Incoming, Provider}; use futures::{ - future::{self, BoxFuture, FutureExt}, + future::{BoxFuture, FutureExt}, prelude::*, }; -use futures_timer::Delay; -use if_addrs::{get_if_addrs, IfAddr}; -use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -use std::collections::HashSet; use std::convert::TryFrom; use std::io; use std::net; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Duration; #[derive(Copy, Clone)] pub enum Tcp {} -pub struct IfWatcher { - addrs: HashSet, - delay: Delay, - pending: Vec, -} - impl Provider for Tcp { type Stream = TcpStream; type Listener = tokio_crate::net::TcpListener; - type IfWatcher = IfWatcher; - - fn if_watcher() -> BoxFuture<'static, io::Result> { - future::ready(Ok(IfWatcher { - addrs: HashSet::new(), - delay: Delay::new(Duration::from_secs(0)), - pending: Vec::new(), - })) - .boxed() - } fn new_listener(l: net::TcpListener) -> io::Result { tokio_crate::net::TcpListener::try_from(l) @@ -104,51 +83,6 @@ impl Provider for Tcp { remote_addr, })) } - - fn poll_interfaces(w: &mut Self::IfWatcher, cx: &mut Context<'_>) -> Poll> { - loop { - if let Some(event) = w.pending.pop() { - return Poll::Ready(Ok(event)); - } - - match Pin::new(&mut w.delay).poll(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(()) => { - let ifs = get_if_addrs()?; - let addrs = ifs - .into_iter() - .map(|iface| match iface.addr { - IfAddr::V4(ip4) => { - let prefix_len = - (!u32::from_be_bytes(ip4.netmask.octets())).leading_zeros(); - let ipnet = Ipv4Net::new(ip4.ip, prefix_len as u8) - .expect("prefix_len can not exceed 32"); - IpNet::V4(ipnet) - } - IfAddr::V6(ip6) => { - let prefix_len = - (!u128::from_be_bytes(ip6.netmask.octets())).leading_zeros(); - let ipnet = Ipv6Net::new(ip6.ip, prefix_len as u8) - .expect("prefix_len can not exceed 128"); - IpNet::V6(ipnet) - } - }) - .collect::>(); - - for down in w.addrs.difference(&addrs) { - w.pending.push(IfEvent::Down(*down)); - } - - for up in addrs.difference(&w.addrs) { - w.pending.push(IfEvent::Up(*up)); - } - - w.addrs = addrs; - w.delay.reset(Duration::from_secs(10)); - } - } - } - } } /// A [`tokio_crate::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`].