From fe3e09b710caeaaf26a6ea83e618a19d3c2285a1 Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Fri, 9 Sep 2022 23:43:29 +0200 Subject: [PATCH] transports/quic: upgrade to if-watch v2.0.0 See corresponding change in tcp transport: libp2p#2813. --- transports/quic/Cargo.toml | 2 +- transports/quic/src/in_addr.rs | 100 -------------------- transports/quic/src/lib.rs | 1 - transports/quic/src/transport.rs | 153 +++++++++++++++---------------- 4 files changed, 77 insertions(+), 179 deletions(-) delete mode 100644 transports/quic/src/in_addr.rs diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index 0480c0629ed..621ce8d8e05 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -11,7 +11,7 @@ license = "MIT" async-global-executor = "2.0.2" async-io = "1.6.0" futures = "0.3.15" -if-watch = "1.0.0" +if-watch = "2.0.0" libp2p-core = { version = "0.36.0", path = "../../core" } parking_lot = "0.12.0" quinn-proto = { version = "0.8.2", default-features = false, features = ["tls-rustls"] } diff --git a/transports/quic/src/in_addr.rs b/transports/quic/src/in_addr.rs deleted file mode 100644 index 67b6abbf3f3..00000000000 --- a/transports/quic/src/in_addr.rs +++ /dev/null @@ -1,100 +0,0 @@ -use if_watch::{IfEvent, IfWatcher}; - -use futures::{ - future::{BoxFuture, FutureExt}, - stream::Stream, -}; - -use std::{ - io::Result, - net::IpAddr, - ops::DerefMut, - pin::Pin, - task::{Context, Poll}, -}; - -/// Watches for interface changes. -#[derive(Debug)] -pub enum InAddr { - /// The socket accepts connections on a single interface. - One { ip: Option }, - /// The socket accepts connections on all interfaces. - Any { if_watch: Box }, -} - -impl InAddr { - /// If ip is specified then only one `IfEvent::Up` with IpNet(ip)/32 will be generated. - /// If ip is unspecified then `IfEvent::Up/Down` events will be generated for all interfaces. - pub fn new(ip: IpAddr) -> Self { - if ip.is_unspecified() { - let watcher = IfWatch::Pending(IfWatcher::new().boxed()); - InAddr::Any { - if_watch: Box::new(watcher), - } - } else { - InAddr::One { ip: Some(ip) } - } - } -} - -pub enum IfWatch { - Pending(BoxFuture<'static, std::io::Result>), - Ready(Box), -} - -impl std::fmt::Debug for IfWatch { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match *self { - IfWatch::Pending(_) => write!(f, "Pending"), - IfWatch::Ready(_) => write!(f, "Ready"), - } - } -} -impl Stream for InAddr { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let me = Pin::into_inner(self); - loop { - match me { - // If the listener is bound to a single interface, make sure the - // address is reported once. - InAddr::One { ip } => { - if let Some(ip) = ip.take() { - return Poll::Ready(Some(Ok(IfEvent::Up(ip.into())))); - } - } - InAddr::Any { if_watch } => { - match if_watch.deref_mut() { - // If we listen on all interfaces, wait for `if-watch` to be ready. - IfWatch::Pending(f) => match futures::ready!(f.poll_unpin(cx)) { - Ok(watcher) => { - *if_watch = Box::new(IfWatch::Ready(Box::new(watcher))); - continue; - } - Err(err) => { - *if_watch = Box::new(IfWatch::Pending(IfWatcher::new().boxed())); - return Poll::Ready(Some(Err(err))); - } - }, - // Consume all events for up/down interface changes. - IfWatch::Ready(watcher) => { - if let Poll::Ready(ev) = watcher.poll_unpin(cx) { - match ev { - Ok(event) => { - return Poll::Ready(Some(Ok(event))); - } - Err(err) => { - return Poll::Ready(Some(Err(err))); - } - } - } - } - } - } - } - break; - } - Poll::Pending - } -} diff --git a/transports/quic/src/lib.rs b/transports/quic/src/lib.rs index 3dca1d3cbe3..2d9a4491bab 100644 --- a/transports/quic/src/lib.rs +++ b/transports/quic/src/lib.rs @@ -55,7 +55,6 @@ mod connection; mod endpoint; mod error; -mod in_addr; mod muxer; mod tls; mod upgrade; diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 26dcac29109..0aa3295ac0a 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -25,13 +25,14 @@ use crate::connection::Connection; use crate::endpoint::ToEndpoint; use crate::Config; -use crate::{endpoint::Endpoint, in_addr::InAddr, muxer::QuicMuxer, upgrade::Upgrade}; +use crate::{endpoint::Endpoint, muxer::QuicMuxer, upgrade::Upgrade}; use futures::channel::oneshot; +use futures::ready; use futures::stream::StreamExt; use futures::{channel::mpsc, prelude::*, stream::SelectAll}; -use if_watch::IfEvent; +use if_watch::{IfEvent, IfWatcher}; use libp2p_core::{ multiaddr::{Multiaddr, Protocol}, @@ -253,17 +254,13 @@ struct Listener { /// Channel where new connections are being sent. new_connections_rx: mpsc::Receiver, - /// The IP addresses of network interfaces on which the listening socket - /// is accepting connections. - /// - /// If the listen socket listens on all interfaces, these may change over - /// time as interfaces become available or unavailable. - in_addr: InAddr, + if_watcher: Option, - /// Set to `Some` if this [`Listener`] should close. - /// Optionally contains a [`TransportEvent::ListenerClosed`] that should be - /// reported before the listener's stream is terminated. - report_closed: Option::Item>>, + /// Whether the listener was closed and the stream should terminate. + is_closed: bool, + + /// Pending event to reported. + pending_event: Option<::Item>, pending_dials: VecDeque, @@ -276,14 +273,29 @@ impl Listener { socket_addr: SocketAddr, config: Config, ) -> Result { - let in_addr = InAddr::new(socket_addr.ip()); let (endpoint, new_connections_rx) = Endpoint::new_bidirectional(config, socket_addr)?; + + let if_watcher; + let pending_event; + if socket_addr.ip().is_unspecified() { + if_watcher = Some(IfWatcher::new()?); + pending_event = None; + } else { + if_watcher = None; + let ma = socketaddr_to_multiaddr(&endpoint.socket_addr); + pending_event = Some(TransportEvent::NewAddress { + listener_id, + listen_addr: ma, + }) + } + Ok(Listener { endpoint, listener_id, new_connections_rx, - in_addr, - report_closed: None, + if_watcher, + is_closed: false, + pending_event, pending_dials: VecDeque::new(), waker: None, }) @@ -292,68 +304,54 @@ impl Listener { /// Report the listener as closed in a [`TransportEvent::ListenerClosed`] and /// terminate the stream. fn close(&mut self, reason: Result<(), Error>) { - match self.report_closed { - Some(_) => tracing::debug!("Listener was already closed."), - None => { - // Report the listener event as closed. - let _ = self - .report_closed - .insert(Some(TransportEvent::ListenerClosed { - listener_id: self.listener_id, - reason, - })); - } + if self.is_closed { + return; } + self.pending_event = Some(TransportEvent::ListenerClosed { + listener_id: self.listener_id, + reason, + }); + self.is_closed = true; } /// Poll for a next If Event. - fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Option<::Item> { + fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<::Item> { + let if_watcher = match self.if_watcher.as_mut() { + Some(iw) => iw, + None => return Poll::Pending, + }; loop { - match self.in_addr.poll_next_unpin(cx) { - Poll::Ready(mut item) => { - if let Some(item) = item.take() { - // Consume all events for up/down interface changes. - match item { - Ok(IfEvent::Up(inet)) => { - let ip = inet.addr(); - if self.endpoint.socket_addr.is_ipv4() == ip.is_ipv4() { - let socket_addr = - SocketAddr::new(ip, self.endpoint.socket_addr.port()); - let ma = socketaddr_to_multiaddr(&socket_addr); - tracing::debug!("New listen address: {}", ma); - return Some(TransportEvent::NewAddress { - listener_id: self.listener_id, - listen_addr: ma, - }); - } - } - Ok(IfEvent::Down(inet)) => { - let ip = inet.addr(); - if self.endpoint.socket_addr.is_ipv4() == ip.is_ipv4() { - let socket_addr = - SocketAddr::new(ip, self.endpoint.socket_addr.port()); - let ma = socketaddr_to_multiaddr(&socket_addr); - tracing::debug!("Expired listen address: {}", ma); - return Some(TransportEvent::AddressExpired { - listener_id: self.listener_id, - listen_addr: ma, - }); - } - } - Err(err) => { - tracing::debug! { - "Failure polling interfaces: {:?}.", - err - }; - return Some(TransportEvent::ListenerError { - listener_id: self.listener_id, - error: err.into(), - }); - } - } + match ready!(if_watcher.poll_if_event(cx)) { + Ok(IfEvent::Up(inet)) => { + let ip = inet.addr(); + if self.endpoint.socket_addr.is_ipv4() == ip.is_ipv4() { + let socket_addr = SocketAddr::new(ip, self.endpoint.socket_addr.port()); + let ma = socketaddr_to_multiaddr(&socket_addr); + tracing::debug!("New listen address: {}", ma); + return Poll::Ready(TransportEvent::NewAddress { + listener_id: self.listener_id, + listen_addr: ma, + }); } } - Poll::Pending => return None, + Ok(IfEvent::Down(inet)) => { + let ip = inet.addr(); + if self.endpoint.socket_addr.is_ipv4() == ip.is_ipv4() { + let socket_addr = SocketAddr::new(ip, self.endpoint.socket_addr.port()); + let ma = socketaddr_to_multiaddr(&socket_addr); + tracing::debug!("Expired listen address: {}", ma); + return Poll::Ready(TransportEvent::AddressExpired { + listener_id: self.listener_id, + listen_addr: ma, + }); + } + } + Err(err) => { + return Poll::Ready(TransportEvent::ListenerError { + listener_id: self.listener_id, + error: err.into(), + }) + } } } } @@ -363,15 +361,16 @@ impl Stream for Listener { type Item = TransportEvent<::ListenerUpgrade, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - if let Some(closed) = self.report_closed.as_mut() { - // Listener was closed. - // Report the transport event if there is one. On the next iteration, return - // `Poll::Ready(None)` to terminate the stream. - return Poll::Ready(closed.take()); - } - if let Some(event) = self.poll_if_addr(cx) { + if let Some(event) = self.pending_event.take() { return Poll::Ready(Some(event)); } + if self.is_closed { + return Poll::Ready(None); + } + match self.poll_if_addr(cx) { + Poll::Ready(event) => return Poll::Ready(Some(event)), + Poll::Pending => {} + } if !self.pending_dials.is_empty() { match self.endpoint.to_endpoint.poll_ready_unpin(cx) { Poll::Ready(Ok(_)) => {