Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports/tcp: simplify IfWatcher integration #2813

Merged
merged 28 commits into from Sep 10, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
cf5bda2
transports/tcp: Update to `if-watch` v2.0.0
elenaf9 Aug 11, 2022
f832d08
transports/tcp: use `if-watcher` in all runtimes
elenaf9 Aug 11, 2022
e72b2fe
transports/tcp: fix docs for not feature async-io
elenaf9 Aug 11, 2022
9b1dc6b
tcp/transports: remove addr(s) from `InAddr`
elenaf9 Aug 11, 2022
1a8c64f
Merge branch 'master' of github.com:libp2p/rust-libp2p into tcp/refac…
elenaf9 Aug 18, 2022
2198640
transports/tcp: fix clippy
elenaf9 Aug 18, 2022
a39c192
Merge branch 'master' of github.com:libp2p/rust-libp2p into tcp/refac…
elenaf9 Aug 19, 2022
a139f76
transports/tcp: adapt to latest `if-watch` master
elenaf9 Aug 19, 2022
e59d665
transports/tcp: Use `if-watch` from crates.io
elenaf9 Aug 20, 2022
9cc1678
Merge branch 'master' of github.com:libp2p/rust-libp2p into tcp/refac…
elenaf9 Aug 20, 2022
64a3165
transports/tcp: remove `InAddr`
elenaf9 Aug 11, 2022
8d16ad7
transports/tcp: consistent poll style
elenaf9 Aug 20, 2022
603b39c
transports/tcp: poll `pause` before `IfWatcher`
elenaf9 Aug 20, 2022
9f3b14e
transports/tcp/tests: fix clippy
elenaf9 Aug 20, 2022
e2ca55d
transports/tcp: use `poll_unpin` to poll `pause`
elenaf9 Aug 20, 2022
2e5b11d
transports/tcp: add changelog entry
elenaf9 Aug 20, 2022
582582a
transports/tcp: docs
elenaf9 Aug 20, 2022
856065b
transports/tcp: fmt
elenaf9 Aug 20, 2022
e2d001f
transports/tcp: minor style change
elenaf9 Aug 23, 2022
607ba62
Merge branch 'master' of github.com:libp2p/rust-libp2p into tcp/refac…
elenaf9 Aug 23, 2022
4a63d84
transports/tcp: return early if ip is unspecified
elenaf9 Aug 30, 2022
972a34e
Merge branch 'master' of github.com:libp2p/rust-libp2p into tcp/refac…
elenaf9 Aug 30, 2022
036f56f
transports/tcp: don't log error before returning
elenaf9 Aug 30, 2022
864237f
Merge branch 'master' into tcp/refactor-inaddr
elenaf9 Sep 7, 2022
773cd1b
Merge branch 'master' into tcp/refactor-inaddr
mxinden Sep 9, 2022
99fa0cb
transports/tcp: Move changelog entry and bump version
mxinden Sep 9, 2022
9f802b2
transports/tcp/CHANGELOG: Mark entry as unreleased
mxinden Sep 9, 2022
d5af2ca
Cargo.toml: Bump version
mxinden Sep 9, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 4 additions & 6 deletions transports/tcp/Cargo.toml
Expand Up @@ -14,9 +14,7 @@ categories = ["network-programming", "asynchronous"]
async-io-crate = { package = "async-io", version = "1.2.0", optional = true }
futures = "0.3.8"
futures-timer = "3.0"
if-watch = { version = "1.1.1", optional = true }
if-addrs = { version = "0.7.0", optional = true }
ipnet = "2.0.0"
if-watch = { version = "2.0.0", git = "https://github.com/mxinden/if-watch.git" }
libc = "0.2.80"
libp2p-core = { version = "0.35.0", path = "../../core", default-features = false }
log = "0.4.11"
Expand All @@ -25,10 +23,10 @@ tokio-crate = { package = "tokio", version = "1.19.0", default-features = false,

[features]
default = ["async-io"]
tokio = ["tokio-crate", "if-addrs"]
async-io = ["async-io-crate", "if-watch"]
tokio = ["tokio-crate"]
async-io = ["async-io-crate"]

[dev-dependencies]
async-std = { version = "1.6.5", features = ["attributes"] }
tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net", "rt"] }
tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net", "rt", "macros"] }
env_logger = "0.9.0"
195 changes: 77 additions & 118 deletions transports/tcp/src/lib.rs
Expand Up @@ -28,6 +28,7 @@

mod provider;

use if_watch::{IfEvent, IfWatcher};
#[cfg(feature = "async-io")]
pub use provider::async_io;

Expand All @@ -43,9 +44,8 @@ pub use provider::tokio;
pub type TokioTcpTransport = GenTcpTransport<tokio::Tcp>;

use futures::{
future::{self, BoxFuture, Ready},
future::{self, Ready},
prelude::*,
ready,
};
use futures_timer::Delay;
use libp2p_core::{
Expand All @@ -64,7 +64,7 @@ use std::{
time::Duration,
};

use provider::{IfEvent, Provider};
use provider::Provider;

/// The configuration for a TCP/IP transport capability for libp2p.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -243,6 +243,9 @@ impl GenTcpConfig {
/// # use libp2p_core::transport::{ListenerId, TransportEvent};
/// # use libp2p_core::{Multiaddr, Transport};
/// # use std::pin::Pin;
/// # #[cfg(not(feature = "async-io"))]
/// # fn main() {}
/// #
/// #[cfg(feature = "async-io")]
/// #[async_std::main]
/// async fn main() -> std::io::Result<()> {
Expand Down Expand Up @@ -398,7 +401,6 @@ impl<T> Transport for GenTcpTransport<T>
where
T: Provider + Send + 'static,
T::Listener: Unpin,
T::IfWatcher: Unpin,
T::Stream: Unpin,
{
type Output = T::Stream;
Expand Down Expand Up @@ -605,23 +607,12 @@ pub enum TcpListenerEvent<S> {
Error(io::Error),
}

enum IfWatch<TIfWatcher> {
Pending(BoxFuture<'static, io::Result<TIfWatcher>>),
Ready(TIfWatcher),
}

/// The listening addresses of a [`TcpListenStream`].
enum InAddr<TIfWatcher> {
enum InAddr {
/// The stream accepts connections on a single interface.
One {
addr: IpAddr,
out: Option<Multiaddr>,
},
One(Option<Multiaddr>),
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
/// The stream accepts connections on all interfaces.
Any {
addrs: HashSet<IpAddr>,
if_watch: IfWatch<TIfWatcher>,
},
Any(Box<IfWatcher>),
}

/// A stream of incoming connections on one or more interfaces.
Expand All @@ -642,7 +633,7 @@ where
///
/// If the listen socket listens on all interfaces, these may change over
/// time as interfaces become available or unavailable.
in_addr: InAddr<T::IfWatcher>,
in_addr: InAddr,
/// The port reuse configuration for outgoing connections.
///
/// If enabled, all IP addresses on which this listening stream
Expand Down Expand Up @@ -676,15 +667,9 @@ where
} {
// The `addrs` are populated via `if_watch` when the
// `TcpListenStream` is polled.
InAddr::Any {
addrs: HashSet::new(),
if_watch: IfWatch::Pending(T::if_watcher()),
}
InAddr::Any(Box::new(IfWatcher::new()?))
} else {
InAddr::One {
out: Some(ip_to_multiaddr(listen_addr.ip(), listen_addr.port())),
addr: listen_addr.ip(),
}
InAddr::One(Some(ip_to_multiaddr(listen_addr.ip(), listen_addr.port())))
};

let listener = T::new_listener(listener)?;
Expand All @@ -708,12 +693,14 @@ where
/// Has no effect if port reuse is disabled.
fn disable_port_reuse(&mut self) {
match &self.in_addr {
InAddr::One { addr, .. } => {
self.port_reuse.unregister(*addr, self.listen_addr.port());
InAddr::One(_) => {
self.port_reuse
.unregister(self.listen_addr.ip(), self.listen_addr.port());
}
InAddr::Any { addrs, .. } => {
for addr in addrs {
self.port_reuse.unregister(*addr, self.listen_addr.port());
InAddr::Any(if_watcher) => {
for ip_net in if_watcher.iter() {
self.port_reuse
.unregister(ip_net.addr(), self.listen_addr.port());
}
}
}
Expand All @@ -734,116 +721,88 @@ where
T: Provider,
T::Listener: Unpin,
T::Stream: Unpin,
T::IfWatcher: Unpin,
{
type Item = Result<TcpListenerEvent<T::Stream>, io::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let me = Pin::into_inner(self);

loop {
match &mut me.in_addr {
InAddr::Any { if_watch, addrs } => match if_watch {
// If we listen on all interfaces, wait for `if-watch` to be ready.
IfWatch::Pending(f) => match ready!(Pin::new(f).poll(cx)) {
Ok(w) => {
*if_watch = IfWatch::Ready(w);
continue;
match &mut me.in_addr {
InAddr::Any(if_watcher) => {
while let Poll::Ready(ev) = if_watcher.as_mut().poll_if_event(cx) {
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
match ev {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
if me.listen_addr.is_ipv4() == ip.is_ipv4() {
let ma = ip_to_multiaddr(ip, me.listen_addr.port());
log::debug!("New listen address: {}", ma);
me.port_reuse.register(ip, me.listen_addr.port());
return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(ma))));
}
}
Ok(IfEvent::Down(inet)) => {
let ip = inet.addr();
if me.listen_addr.is_ipv4() == ip.is_ipv4() {
let ma = ip_to_multiaddr(ip, me.listen_addr.port());
log::debug!("Expired listen address: {}", ma);
me.port_reuse.unregister(ip, me.listen_addr.port());
return Poll::Ready(Some(Ok(TcpListenerEvent::AddressExpired(ma))));
}
}
Err(err) => {
log::debug! {
"Failed to begin observing interfaces: {:?}. Scheduling retry.",
"Failure polling interfaces: {:?}. Scheduling retry.",
err
};
*if_watch = IfWatch::Pending(T::if_watcher());
me.pause = Some(Delay::new(me.sleep_on_error));
return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err))));
}
},
// Consume all events for up/down interface changes.
IfWatch::Ready(watch) => {
while let Poll::Ready(ev) = T::poll_interfaces(watch, cx) {
match ev {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.insert(ip)
{
let ma = ip_to_multiaddr(ip, me.listen_addr.port());
log::debug!("New listen address: {}", ma);
me.port_reuse.register(ip, me.listen_addr.port());
return Poll::Ready(Some(Ok(
TcpListenerEvent::NewAddress(ma),
)));
}
}
Ok(IfEvent::Down(inet)) => {
let ip = inet.addr();
if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.remove(&ip)
{
let ma = ip_to_multiaddr(ip, me.listen_addr.port());
log::debug!("Expired listen address: {}", ma);
me.port_reuse.unregister(ip, me.listen_addr.port());
return Poll::Ready(Some(Ok(
TcpListenerEvent::AddressExpired(ma),
)));
}
}
Err(err) => {
log::debug! {
"Failure polling interfaces: {:?}. Scheduling retry.",
err
};
me.pause = Some(Delay::new(me.sleep_on_error));
return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err))));
}
}
}
}
},
// If the listener is bound to a single interface, make sure the
// address is registered for port reuse and reported once.
InAddr::One { addr, out } => {
if let Some(multiaddr) = out.take() {
me.port_reuse.register(*addr, me.listen_addr.port());
return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(multiaddr))));
}
}
}

if let Some(mut pause) = me.pause.take() {
match Pin::new(&mut pause).poll(cx) {
Poll::Ready(_) => {}
Poll::Pending => {
me.pause = Some(pause);
return Poll::Pending;
}
// If the listener is bound to a single interface, make sure the
// address is registered for port reuse and reported once.
InAddr::One(out) => {
if let Some(multiaddr) = out.take() {
me.port_reuse
.register(me.listen_addr.ip(), me.listen_addr.port());
return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(multiaddr))));
}
}
}

// Take the pending connection from the backlog.
let incoming = match T::poll_accept(&mut me.listener, cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(incoming)) => incoming,
Poll::Ready(Err(e)) => {
// These errors are non-fatal for the listener stream.
log::error!("error accepting incoming connection: {}", e);
me.pause = Some(Delay::new(me.sleep_on_error));
return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e))));
if let Some(mut pause) = me.pause.take() {
match Pin::new(&mut pause).poll(cx) {
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
Poll::Ready(_) => {}
Poll::Pending => {
me.pause = Some(pause);
return Poll::Pending;
}
};
}
}

let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port());
let remote_addr =
ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port());
// Take the pending connection from the backlog.
let incoming = match T::poll_accept(&mut me.listener, cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(incoming)) => incoming,
Poll::Ready(Err(e)) => {
// These errors are non-fatal for the listener stream.
log::error!("error accepting incoming connection: {}", e);
me.pause = Some(Delay::new(me.sleep_on_error));
return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e))));
}
};

log::debug!("Incoming connection from {} at {}", remote_addr, local_addr);
let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port());
let remote_addr = ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port());

return Poll::Ready(Some(Ok(TcpListenerEvent::Upgrade {
upgrade: future::ok(incoming.stream),
local_addr,
remote_addr,
})));
}
log::debug!("Incoming connection from {} at {}", remote_addr, local_addr);

Poll::Ready(Some(Ok(TcpListenerEvent::Upgrade {
upgrade: future::ok(incoming.stream),
local_addr,
remote_addr,
})))
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
18 changes: 0 additions & 18 deletions transports/tcp/src/provider.rs
Expand Up @@ -28,18 +28,10 @@ pub mod tokio;

use futures::future::BoxFuture;
use futures::io::{AsyncRead, AsyncWrite};
use ipnet::IpNet;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::task::{Context, Poll};
use std::{fmt, io};

/// An event relating to a change of availability of an address
/// on a network interface.
pub enum IfEvent {
Up(IpNet),
Down(IpNet),
}

/// An incoming connection returned from [`Provider::poll_accept()`].
pub struct Incoming<S> {
pub stream: S,
Expand All @@ -54,12 +46,6 @@ pub trait Provider: Clone + Send + 'static {
type Stream: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug;
/// The type of TCP listeners obtained from [`Provider::new_listener`].
type Listener: Send + Unpin;
/// The type of network interface observers obtained from [`Provider::if_watcher`].
type IfWatcher: Send + Unpin;

/// Creates an instance of [`Self::IfWatcher`] that can be polled for
/// network interface changes via [`Self::poll_interfaces`].
fn if_watcher() -> BoxFuture<'static, io::Result<Self::IfWatcher>>;

/// Creates a new listener wrapping the given [`TcpListener`] that
/// can be polled for incoming connections via [`Self::poll_accept()`].
Expand All @@ -77,8 +63,4 @@ pub trait Provider: Clone + Send + 'static {
_: &mut Self::Listener,
_: &mut Context<'_>,
) -> Poll<io::Result<Incoming<Self::Stream>>>;

/// Polls a [`Self::IfWatcher`] for network interface changes, ensuring a task wakeup,
/// if necessary.
fn poll_interfaces(_: &mut Self::IfWatcher, _: &mut Context<'_>) -> Poll<io::Result<IfEvent>>;
}
14 changes: 1 addition & 13 deletions transports/tcp/src/provider/async_io.rs
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use super::{IfEvent, Incoming, Provider};
use super::{Incoming, Provider};

use async_io_crate::Async;
use futures::future::{BoxFuture, FutureExt};
Expand All @@ -32,11 +32,6 @@ pub enum Tcp {}
impl Provider for Tcp {
type Stream = Async<net::TcpStream>;
type Listener = Async<net::TcpListener>;
type IfWatcher = if_watch::IfWatcher;

fn if_watcher() -> BoxFuture<'static, io::Result<Self::IfWatcher>> {
if_watch::IfWatcher::new().boxed()
}

fn new_listener(l: net::TcpListener) -> io::Result<Self::Listener> {
Async::new(l)
Expand Down Expand Up @@ -87,11 +82,4 @@ impl Provider for Tcp {
remote_addr,
}))
}

fn poll_interfaces(w: &mut Self::IfWatcher, cx: &mut Context<'_>) -> Poll<io::Result<IfEvent>> {
w.poll_unpin(cx).map_ok(|e| match e {
if_watch::IfEvent::Up(a) => IfEvent::Up(a),
if_watch::IfEvent::Down(a) => IfEvent::Down(a),
})
}
}