From 9a605b6ec72cf9097331dbef283c3d0ca7a665cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Tue, 23 Aug 2022 22:02:35 -0400 Subject: [PATCH 01/30] ci(*): testing workflow --- .github/workflows/testing.yml | 134 ++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 .github/workflows/testing.yml diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml new file mode 100644 index 00000000000..3bd0559dd25 --- /dev/null +++ b/.github/workflows/testing.yml @@ -0,0 +1,134 @@ +name: Continuous integration + +on: + workflow_dispatch + +jobs: + testing-all-features: + name: Build and test + runs-on: ubuntu-latest + strategy: + matrix: + args: [ + "--no-default-features", + "--all-features", + "--benches --all-features", + ] + steps: + + - name: Cancel Previous Runs + uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0 + with: + access_token: ${{ github.token }} + + - name: Install Protoc + uses: arduino/setup-protoc@v1 + + - uses: actions/checkout@v3 + + - uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0 + with: + key: ${{ matrix.args }} + + - run: cargo test --workspace ${{ matrix.args }} + + check-rustdoc-links: + name: Check rustdoc intra-doc links + runs-on: ubuntu-latest + steps: + + - name: Cancel Previous Runs + uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0 + with: + access_token: ${{ github.token }} + + - name: Install Protoc + uses: arduino/setup-protoc@v1 + + - uses: actions/checkout@v3 + + - uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7 + with: + profile: minimal + toolchain: stable + override: true + + - uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0 + + - name: Check rustdoc links + run: RUSTDOCFLAGS="--deny broken_intra_doc_links" cargo doc --verbose --workspace --no-deps --document-private-items --all-features + + check-clippy: + runs-on: ubuntu-latest + steps: + + - name: Cancel Previous Runs + uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0 + with: + access_token: ${{ github.token }} + + - name: Install Protoc + uses: arduino/setup-protoc@v1 + + - uses: actions/checkout@v3 + + - uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7 + with: + profile: minimal + toolchain: stable + override: true + components: clippy + + - uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0 + + - name: Run cargo clippy + uses: actions-rs/cargo@844f36862e911db73fe0815f00a4a2602c279505 # v1.0.3 + with: + command: custom-clippy # cargo alias to allow reuse of config locally + + integration-test: + name: Integration tests + runs-on: ubuntu-latest + steps: + + - name: Cancel Previous Runs + uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0 + with: + access_token: ${{ github.token }} + + - name: Install Protoc + uses: arduino/setup-protoc@v1 + + - uses: actions/checkout@v3 + + - uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7 + with: + profile: minimal + toolchain: stable + override: true + + - uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0 + + - name: Run ipfs-kad example + run: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad + + rustfmt: + runs-on: ubuntu-latest + steps: + + - name: Cancel Previous Runs + uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0 + with: + access_token: ${{ github.token }} + + - uses: actions/checkout@v3 + + - uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7 + with: + profile: minimal + toolchain: stable + override: true + components: rustfmt + + - name: Check formatting + run: cargo fmt -- --check From 002a533443564ccc0c2511e6a4172498332b2f42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Tue, 23 Aug 2022 22:04:39 -0400 Subject: [PATCH 02/30] ci(*): update workflow name --- .github/workflows/testing.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 3bd0559dd25..eb77060a8ee 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -1,4 +1,4 @@ -name: Continuous integration +name: Pre testing on: workflow_dispatch From c75f3c2b52f37ed87772cbf91f08dde41a56f6cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Thu, 1 Sep 2022 16:23:17 -0400 Subject: [PATCH 03/30] Permit use mdns with Tokio or AsyncIO runtime --- Cargo.toml | 7 +- protocols/mdns/Cargo.toml | 23 ++- protocols/mdns/src/behaviour.rs | 14 +- protocols/mdns/src/behaviour/iface.rs | 105 +++++----- protocols/mdns/src/behaviour/socket.rs | 182 ++++++++++++++++++ protocols/mdns/src/behaviour/timer.rs | 111 +++++++++++ .../tests/{smoke.rs => with-async-std.rs} | 27 --- protocols/mdns/tests/with-tokio.rs | 141 ++++++++++++++ src/lib.rs | 3 +- 9 files changed, 520 insertions(+), 93 deletions(-) create mode 100644 protocols/mdns/src/behaviour/socket.rs create mode 100644 protocols/mdns/src/behaviour/timer.rs rename protocols/mdns/tests/{smoke.rs => with-async-std.rs} (87%) create mode 100644 protocols/mdns/tests/with-tokio.rs diff --git a/Cargo.toml b/Cargo.toml index b64c2f68756..b5221484e17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ default = [ "identify", "kad", "gossipsub", - "mdns", + "mdns-async-io", "mplex", "noise", "ping", @@ -46,7 +46,8 @@ identify = ["dep:libp2p-identify", "libp2p-metrics?/identify"] kad = ["dep:libp2p-kad", "libp2p-metrics?/kad"] gossipsub = ["dep:libp2p-gossipsub", "libp2p-metrics?/gossipsub"] metrics = ["dep:libp2p-metrics"] -mdns = ["dep:libp2p-mdns"] +mdns-async-io = ["dep:libp2p-mdns", "libp2p-mdns?/async-io"] +mdns-tokio = ["dep:libp2p-mdns", "libp2p-mdns?/tokio"] mplex = ["dep:libp2p-mplex"] noise = ["dep:libp2p-noise"] ping = ["dep:libp2p-ping", "libp2p-metrics?/ping"] @@ -106,7 +107,7 @@ smallvec = "1.6.1" [target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies] libp2p-deflate = { version = "0.35.0", path = "transports/deflate", optional = true } libp2p-dns = { version = "0.35.0", path = "transports/dns", optional = true, default-features = false } -libp2p-mdns = { version = "0.40.0", path = "protocols/mdns", optional = true } +libp2p-mdns = { version = "0.40.0", path = "protocols/mdns", optional = true, default-features = false } libp2p-tcp = { version = "0.35.0", path = "transports/tcp", default-features = false, optional = true } libp2p-websocket = { version = "0.37.0", path = "transports/websocket", optional = true } diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index 306f052cc69..168df699d32 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -11,7 +11,6 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -async-io = "1.3.1" data-encoding = "2.3.2" dns-parser = "0.8.0" futures = "0.3.13" @@ -25,8 +24,26 @@ smallvec = "1.6.1" socket2 = { version = "0.4.0", features = ["all"] } void = "1.0.2" +async-io = { version = "1.3.1", optional = true } +tokio = { version = "1.19", default-features = false, features = ["net", "time"], optional = true} + +[features] +default = [] +tokio = ["dep:tokio"] +async-io = ["dep:async-io"] + [dev-dependencies] async-std = { version = "1.9.0", features = ["attributes"] } env_logger = "0.9.0" -libp2p = { path = "../..", default-features = false, features = ["mdns", "tcp-async-io", "dns-async-std", "websocket", "noise", "mplex", "yamux"] } -tokio = { version = "1.15", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] } +libp2p = { path = "../..", default-features = false, features = ["mdns-async-io", "tcp-async-io", "dns-async-std", "tcp-tokio", "dns-tokio", "websocket", "noise", "mplex", "yamux"] } +tokio = { version = "1.19", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] } + + +[[test]] +name = "with-async-std" +required-features = ["async-io"] + +[[test]] +name = "with-tokio" +required-features = ["tokio"] + diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 244b2b784dd..349ea1550e9 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -19,10 +19,13 @@ // DEALINGS IN THE SOFTWARE. mod iface; +mod socket; +mod timer; use self::iface::InterfaceState; +use self::timer::{time::AsyncTimer, TimerBuilder}; + use crate::MdnsConfig; -use async_io::Timer; use futures::prelude::*; use if_watch::{IfEvent, IfWatcher}; use libp2p_core::transport::ListenerId; @@ -57,7 +60,7 @@ pub struct Mdns { /// Future that fires when the TTL of at least one node in `discovered_nodes` expires. /// /// `None` if `discovered_nodes` is empty. - closest_expiration: Option, + closest_expiration: Option, } impl Mdns { @@ -91,7 +94,7 @@ impl Mdns { *expires = now; } } - self.closest_expiration = Some(Timer::at(now)); + self.closest_expiration = Some(AsyncTimer::at(now)); } } @@ -219,8 +222,9 @@ impl NetworkBehaviour for Mdns { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } if let Some(closest_expiration) = closest_expiration { - let mut timer = Timer::at(closest_expiration); - let _ = Pin::new(&mut timer).poll(cx); + let mut timer = AsyncTimer::at(closest_expiration); + let _ = Pin::new(&mut timer).poll_tick(cx); + self.closest_expiration = Some(timer); } Poll::Pending diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index e4971e36b1a..e9efdbb7151 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -24,8 +24,12 @@ mod query; use self::dns::{build_query, build_query_response, build_service_discovery_response}; use self::query::MdnsPacket; use crate::MdnsConfig; -use async_io::{Async, Timer}; -use futures::prelude::*; + +use crate::behaviour::{ + socket::{udp::AsyncUdpSocket, AsyncSocket}, + timer::{time::AsyncTimer, TimerBuilder}, +}; + use libp2p_core::{address_translation, multiaddr::Protocol, Multiaddr, PeerId}; use libp2p_swarm::PollParameters; use socket2::{Domain, Socket, Type}; @@ -34,7 +38,7 @@ use std::{ io, iter, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, pin::Pin, - task::Context, + task::{Context, Poll}, time::{Duration, Instant}, }; @@ -45,9 +49,9 @@ pub struct InterfaceState { /// Address this instance is bound to. addr: IpAddr, /// Receive socket. - recv_socket: Async, + recv_socket: AsyncUdpSocket, /// Send socket. - send_socket: Async, + send_socket: AsyncUdpSocket, /// Buffer used for receiving data from the main socket. /// RFC6762 discourages packets larger than the interface MTU, but allows sizes of up to 9000 /// bytes, if it can be ensured that all participating devices can handle such large packets. @@ -60,7 +64,7 @@ pub struct InterfaceState { /// Discovery interval. query_interval: Duration, /// Discovery timer. - timeout: Timer, + timeout: AsyncTimer, /// Multicast address. multicast_addr: IpAddr, /// Discovered addresses. @@ -83,7 +87,7 @@ impl InterfaceState { socket.set_multicast_loop_v4(true)?; socket.set_multicast_ttl_v4(255)?; socket.join_multicast_v4(&*crate::IPV4_MDNS_MULTICAST_ADDRESS, &addr)?; - Async::new(UdpSocket::from(socket))? + AsyncUdpSocket::from_socket(UdpSocket::from(socket))? } IpAddr::V6(_) => { let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(socket2::Protocol::UDP))?; @@ -94,7 +98,7 @@ impl InterfaceState { socket.set_multicast_loop_v6(true)?; // TODO: find interface matching addr. socket.join_multicast_v6(&*crate::IPV6_MDNS_MULTICAST_ADDRESS, 0)?; - Async::new(UdpSocket::from(socket))? + AsyncUdpSocket::from_socket(UdpSocket::from(socket))? } }; let bind_addr = match addr { @@ -107,7 +111,9 @@ impl InterfaceState { SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0) } }; - let send_socket = Async::new(UdpSocket::bind(bind_addr)?)?; + let std_socket = UdpSocket::bind(bind_addr)?; + let send_socket = AsyncUdpSocket::from_socket(std_socket)?; + // randomize timer to prevent all converging and firing at the same time. let query_interval = { use rand::Rng; @@ -127,19 +133,18 @@ impl InterfaceState { send_buffer: Default::default(), discovered: Default::default(), query_interval, - timeout: Timer::interval_at(Instant::now(), query_interval), + timeout: AsyncTimer::interval_at(Instant::now(), query_interval), multicast_addr, ttl: config.ttl, }) } pub fn reset_timer(&mut self) { - self.timeout.set_interval(self.query_interval); + self.timeout = AsyncTimer::interval(self.query_interval); } pub fn fire_timer(&mut self) { - self.timeout - .set_interval_at(Instant::now(), self.query_interval); + self.timeout = AsyncTimer::interval_at(Instant::now(), self.query_interval); } fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) { @@ -171,17 +176,14 @@ impl InterfaceState { let new_expiration = Instant::now() + peer.ttl(); - let mut addrs: Vec = Vec::new(); for addr in peer.addresses() { - if let Some(new_addr) = address_translation(addr, &observed) { - addrs.push(new_addr.clone()) - } - addrs.push(addr.clone()) - } + let da = if let Some(new_addr) = address_translation(addr, &observed) { + new_addr.clone() + } else { + addr.clone() + }; - for addr in addrs { - self.discovered - .push_back((*peer.id(), addr, new_expiration)); + self.discovered.push_back((*peer.id(), da, new_expiration)); } } } @@ -198,43 +200,40 @@ impl InterfaceState { params: &impl PollParameters, ) -> Option<(PeerId, Multiaddr, Instant)> { // Poll receive socket. - while self.recv_socket.poll_readable(cx).is_ready() { - match self - .recv_socket - .recv_from(&mut self.recv_buffer) - .now_or_never() - { - Some(Ok((len, from))) => { - if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) - { - self.inject_mdns_packet(packet, params); - } + match self + .recv_socket + .poll_receive_packet(cx, &mut self.recv_buffer) + { + Poll::Ready(Some((len, from))) => { + if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) { + self.inject_mdns_packet(packet, params); } - Some(Err(err)) => log::error!("Failed reading datagram: {}", err), - None => {} } + Poll::Ready(None) | Poll::Pending => {} } + // Send responses. - while self.send_socket.poll_writable(cx).is_ready() { - if let Some(packet) = self.send_buffer.pop_front() { - match self - .send_socket - .send_to(&packet, SocketAddr::new(self.multicast_addr, 5353)) - .now_or_never() - { - Some(Ok(_)) => log::trace!("sent packet on iface {}", self.addr), - Some(Err(err)) => { - log::error!("error sending packet on iface {}: {}", self.addr, err) - } - None => self.send_buffer.push_front(packet), - } - } else if Pin::new(&mut self.timeout).poll_next(cx).is_ready() { - log::trace!("sending query on iface {}", self.addr); - self.send_buffer.push_back(build_query()); - } else { - break; + let mut s_buffer = VecDeque::new(); + while let Some(packet) = self.send_buffer.pop_front() { + match self.send_socket.poll_send_packet( + cx, + &packet, + SocketAddr::new(self.multicast_addr, 5353), + ) { + Poll::Ready(_) => log::trace!("sent packet on iface {}", self.addr), + Poll::Pending => s_buffer.push_front(packet), } } + + if !s_buffer.is_empty() { + self.send_buffer = s_buffer; + } + + if Pin::new(&mut self.timeout).poll_tick(cx).is_ready() { + log::trace!("sending query on iface {}", self.addr); + self.send_buffer.push_back(build_query()); + } + // Emit discovered event. self.discovered.pop_front() } diff --git a/protocols/mdns/src/behaviour/socket.rs b/protocols/mdns/src/behaviour/socket.rs new file mode 100644 index 00000000000..cda79c18693 --- /dev/null +++ b/protocols/mdns/src/behaviour/socket.rs @@ -0,0 +1,182 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{ + net::{SocketAddr, UdpSocket}, + task::{Context, Poll}, +}; + +pub(crate) trait AsyncSocket { + type Socket; + + fn from_socket(socket: UdpSocket) -> std::io::Result; + + fn poll_receive_packet( + &mut self, + _cx: &mut Context, + _buf: &mut [u8], + ) -> Poll> { + Poll::Pending + } + + fn poll_send_packet(&mut self, _cx: &mut Context, _packet: &[u8], _to: SocketAddr) -> Poll<()> { + Poll::Pending + } +} + +#[cfg(feature = "async-io")] +pub(crate) mod udp { + use super::*; + use async_io::Async; + use futures::FutureExt; + + pub type AsyncUdpSocket = Async; + + impl AsyncSocket for AsyncUdpSocket { + type Socket = Self; + + fn from_socket(socket: UdpSocket) -> std::io::Result { + Async::new(socket) + } + + fn poll_receive_packet( + &mut self, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + // Poll receive socket. + if self.poll_readable(cx).is_ready() { + match self.recv_from(buf).now_or_never() { + Some(Ok((len, from))) => { + return Poll::Ready(Some((len, from))); + } + Some(Err(err)) => { + log::error!("Failed reading datagram: {}", err); + return Poll::Ready(None); + } + None => { + return Poll::Ready(None); + } + } + } + Poll::Pending + } + + fn poll_send_packet( + &mut self, + cx: &mut Context, + packet: &[u8], + to: SocketAddr, + ) -> Poll<()> { + if self.poll_writable(cx).is_ready() { + match self.send_to(packet, to).now_or_never() { + Some(Ok(_)) => { + log::trace!("sent packet on iface {}", to); + return Poll::Ready(()); + } + Some(Err(err)) => { + log::error!("error sending packet on iface {}: {}", to, err); + return Poll::Ready(()); + } + None => { + return Poll::Pending; + } + } + } + + Poll::Pending + } + } +} + +#[cfg(feature = "tokio")] +pub(crate) mod udp { + use super::*; + use tokio::net::UdpSocket as TokioUdpSocket; + + pub type AsyncUdpSocket = TokioUdpSocket; + + impl AsyncSocket for AsyncUdpSocket { + type Socket = Self; + + fn from_socket(socket: UdpSocket) -> std::io::Result { + socket.set_nonblocking(true)?; + TokioUdpSocket::from_std(socket) + } + + fn poll_receive_packet( + &mut self, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + match self.poll_recv_ready(cx) { + Poll::Ready(Ok(_)) => match self.try_recv_from(buf) { + Ok((len, from)) => { + return Poll::Ready(Some((len, from))); + } + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + return Poll::Ready(None); + } + Err(err) => { + log::error!("Failed reading datagram: {}", err); + return Poll::Ready(None); + } + }, + Poll::Ready(Err(e)) => { + log::error!("Failed recv ready datagram: {}", e); + return Poll::Ready(None); + } + _ => {} + } + + Poll::Pending + } + + fn poll_send_packet( + &mut self, + cx: &mut Context, + packet: &[u8], + to: SocketAddr, + ) -> Poll<()> { + match self.poll_send_ready(cx) { + Poll::Ready(Ok(_)) => match self.try_send_to(packet, to) { + Ok(_len) => { + log::trace!("sent packet on iface {}", to); + return Poll::Ready(()); + } + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + return Poll::Ready(()); + } + Err(err) => { + log::error!("Failed reading datagram: {}", err); + return Poll::Ready(()); + } + }, + Poll::Ready(Err(e)) => { + log::error!("Failed recv ready datagram: {}", e); + return Poll::Ready(()); + } + _ => {} + } + + Poll::Pending + } + } +} diff --git a/protocols/mdns/src/behaviour/timer.rs b/protocols/mdns/src/behaviour/timer.rs new file mode 100644 index 00000000000..7765923e973 --- /dev/null +++ b/protocols/mdns/src/behaviour/timer.rs @@ -0,0 +1,111 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +#[derive(Debug)] +pub(crate) struct WrapTimer { + timer: T, +} + +pub trait TimerBuilder { + type Item; + + fn at(instant: Instant) -> Self; + + fn interval(duration: Duration) -> Self; + + fn interval_at(start: Instant, duration: Duration) -> Self; + + fn poll_tick(&mut self, cx: &mut Context) -> Poll; +} + +#[cfg(feature = "async-io")] +pub(crate) mod time { + use super::*; + use async_io::Timer; + use futures::Stream; + use std::pin::Pin; + + pub(crate) type AsyncTimer = WrapTimer; + + impl TimerBuilder for WrapTimer { + type Item = Option; + + fn at(instant: Instant) -> Self { + WrapTimer { + timer: Timer::at(instant), + } + } + + fn interval(duration: Duration) -> Self { + WrapTimer { + timer: Timer::interval(duration), + } + } + + fn interval_at(start: Instant, duration: Duration) -> Self { + WrapTimer { + timer: Timer::interval_at(start, duration), + } + } + + fn poll_tick(&mut self, cx: &mut Context) -> Poll { + Pin::new(&mut self.timer).poll_next(cx) + } + } +} + +#[cfg(feature = "tokio")] +pub(crate) mod time { + use super::*; + use tokio::time::{self, Instant as TokioInstant, Interval}; + + pub(crate) type AsyncTimer = WrapTimer; + + impl TimerBuilder for WrapTimer { + type Item = time::Instant; + + fn at(instant: Instant) -> Self { + let timer = time::interval_at( + TokioInstant::from_std(instant), + Duration::new(std::u64::MAX, 1_000_000_000 - 1), + ); + WrapTimer { timer } + } + + fn interval(duration: Duration) -> Self { + WrapTimer { + timer: time::interval(duration), + } + } + + fn interval_at(start: Instant, duration: Duration) -> Self { + WrapTimer { + timer: time::interval_at(TokioInstant::from_std(start), duration), + } + } + + fn poll_tick(&mut self, cx: &mut Context) -> Poll { + self.timer.poll_tick(cx) + } + } +} diff --git a/protocols/mdns/tests/smoke.rs b/protocols/mdns/tests/with-async-std.rs similarity index 87% rename from protocols/mdns/tests/smoke.rs rename to protocols/mdns/tests/with-async-std.rs index d123e5abce7..5207a870e50 100644 --- a/protocols/mdns/tests/smoke.rs +++ b/protocols/mdns/tests/with-async-std.rs @@ -83,11 +83,6 @@ async fn test_discovery_async_std_ipv4() -> Result<(), Box> { run_discovery_test(MdnsConfig::default()).await } -#[tokio::test] -async fn test_discovery_tokio_ipv4() -> Result<(), Box> { - run_discovery_test(MdnsConfig::default()).await -} - #[async_std::test] async fn test_discovery_async_std_ipv6() -> Result<(), Box> { let config = MdnsConfig { @@ -97,15 +92,6 @@ async fn test_discovery_async_std_ipv6() -> Result<(), Box> { run_discovery_test(config).await } -#[tokio::test] -async fn test_discovery_tokio_ipv6() -> Result<(), Box> { - let config = MdnsConfig { - enable_ipv6: true, - ..Default::default() - }; - run_discovery_test(config).await -} - async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box> { let mut a = create_swarm(config.clone()).await?; let mut b = create_swarm(config).await?; @@ -152,16 +138,3 @@ async fn test_expired_async_std() -> Result<(), Box> { .map_err(|e| Box::new(e) as Box) } -#[tokio::test] -async fn test_expired_tokio() -> Result<(), Box> { - env_logger::try_init().ok(); - let config = MdnsConfig { - ttl: Duration::from_secs(1), - query_interval: Duration::from_secs(10), - ..Default::default() - }; - - tokio::time::timeout(Duration::from_secs(6), run_peer_expiration_test(config)) - .await - .unwrap() -} diff --git a/protocols/mdns/tests/with-tokio.rs b/protocols/mdns/tests/with-tokio.rs new file mode 100644 index 00000000000..c1440c8a822 --- /dev/null +++ b/protocols/mdns/tests/with-tokio.rs @@ -0,0 +1,141 @@ + +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE.use futures::StreamExt; + +use futures::StreamExt; +use libp2p::{ + identity, + mdns::{Mdns, MdnsConfig, MdnsEvent}, + swarm::{Swarm, SwarmEvent}, + PeerId, +}; +use std::error::Error; +use std::time::Duration; + +async fn create_swarm(config: MdnsConfig) -> Result, Box> { + let id_keys = identity::Keypair::generate_ed25519(); + let peer_id = PeerId::from(id_keys.public()); + let transport = libp2p::tokio_development_transport(id_keys)?; + let behaviour = Mdns::new(config).await?; + let mut swarm = Swarm::new(transport, behaviour, peer_id); + swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; + Ok(swarm) +} + +async fn run_discovery_test(config: MdnsConfig) -> Result<(), Box> { + env_logger::try_init().ok(); + let mut a = create_swarm(config.clone()).await?; + let mut b = create_swarm(config).await?; + let mut discovered_a = false; + let mut discovered_b = false; + loop { + futures::select! { + ev = a.select_next_some() => match ev { + SwarmEvent::Behaviour(MdnsEvent::Discovered(peers)) => { + for (peer, _addr) in peers { + if peer == *b.local_peer_id() { + if discovered_a { + return Ok(()); + } else { + discovered_b = true; + } + } + } + } + _ => {} + }, + ev = b.select_next_some() => match ev { + SwarmEvent::Behaviour(MdnsEvent::Discovered(peers)) => { + for (peer, _addr) in peers { + if peer == *a.local_peer_id() { + if discovered_b { + return Ok(()); + } else { + discovered_a = true; + } + } + } + } + _ => {} + } + } + } +} + + +#[tokio::test] +async fn test_discovery_tokio_ipv4() -> Result<(), Box> { + run_discovery_test(MdnsConfig::default()).await +} + + +#[tokio::test] +async fn test_discovery_tokio_ipv6() -> Result<(), Box> { + let config = MdnsConfig { + enable_ipv6: true, + ..Default::default() + }; + run_discovery_test(config).await +} + +async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box> { + let mut a = create_swarm(config.clone()).await?; + let mut b = create_swarm(config).await?; + + loop { + futures::select! { + ev = a.select_next_some() => match ev { + SwarmEvent::Behaviour(MdnsEvent::Expired(peers)) => { + for (peer, _addr) in peers { + if peer == *b.local_peer_id() { + return Ok(()); + } + } + } + _ => {} + }, + ev = b.select_next_some() => match ev { + SwarmEvent::Behaviour(MdnsEvent::Expired(peers)) => { + for (peer, _addr) in peers { + if peer == *a.local_peer_id() { + return Ok(()); + } + } + } + _ => {} + } + + } + } +} + +#[tokio::test] +async fn test_expired_tokio() -> Result<(), Box> { + env_logger::try_init().ok(); + let config = MdnsConfig { + ttl: Duration::from_secs(1), + query_interval: Duration::from_secs(10), + ..Default::default() + }; + + tokio::time::timeout(Duration::from_secs(6), run_peer_expiration_test(config)) + .await + .unwrap() +} diff --git a/src/lib.rs b/src/lib.rs index 6bb577b1f52..6612984af2b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,8 +79,7 @@ pub use libp2p_identify as identify; #[cfg_attr(docsrs, doc(cfg(feature = "kad")))] #[doc(inline)] pub use libp2p_kad as kad; -#[cfg(feature = "mdns")] -#[cfg_attr(docsrs, doc(cfg(feature = "mdns")))] +#[cfg(any(feature = "mdns-async-io", feature = "mdns-tokio"))] #[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))] #[doc(inline)] pub use libp2p_mdns as mdns; From 10baffe4631a3d0e7f988a1c7e8f0cfb8c482999 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Thu, 7 Jul 2022 10:37:20 -0400 Subject: [PATCH 04/30] test(mdns): renamed mdns tests --- protocols/mdns/tests/{with-async-std.rs => use-async-std.rs} | 1 - protocols/mdns/tests/{with-tokio.rs => use-tokio.rs} | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) rename protocols/mdns/tests/{with-async-std.rs => use-async-std.rs} (99%) rename protocols/mdns/tests/{with-tokio.rs => use-tokio.rs} (99%) diff --git a/protocols/mdns/tests/with-async-std.rs b/protocols/mdns/tests/use-async-std.rs similarity index 99% rename from protocols/mdns/tests/with-async-std.rs rename to protocols/mdns/tests/use-async-std.rs index 5207a870e50..66ba28b5643 100644 --- a/protocols/mdns/tests/with-async-std.rs +++ b/protocols/mdns/tests/use-async-std.rs @@ -137,4 +137,3 @@ async fn test_expired_async_std() -> Result<(), Box> { .map(|_| ()) .map_err(|e| Box::new(e) as Box) } - diff --git a/protocols/mdns/tests/with-tokio.rs b/protocols/mdns/tests/use-tokio.rs similarity index 99% rename from protocols/mdns/tests/with-tokio.rs rename to protocols/mdns/tests/use-tokio.rs index c1440c8a822..28fd2b95d78 100644 --- a/protocols/mdns/tests/with-tokio.rs +++ b/protocols/mdns/tests/use-tokio.rs @@ -1,4 +1,3 @@ - // Copyright 2018 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a @@ -18,6 +17,7 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE.use futures::StreamExt; +extern crate tokio_crate as tokio; use futures::StreamExt; use libp2p::{ @@ -79,13 +79,11 @@ async fn run_discovery_test(config: MdnsConfig) -> Result<(), Box> { } } - #[tokio::test] async fn test_discovery_tokio_ipv4() -> Result<(), Box> { run_discovery_test(MdnsConfig::default()).await } - #[tokio::test] async fn test_discovery_tokio_ipv6() -> Result<(), Box> { let config = MdnsConfig { From a330300f380041094abdf6ae44618b1e70fff72c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Thu, 7 Jul 2022 11:11:53 -0400 Subject: [PATCH 05/30] fix(protocols/mdns): fix high cpu usage with tokio library **Problem** When use the Tokio library with mdns module, it is polling every time the UDP socket; causing hight CPU usage. **Solution** When using Tokio library, use the UppSocket provided by the Tokio library instead of the Async-io wrapper. **Breaking Change** After this fix exist two features ```async-io```, ```tokio``` for this module, for the general module must be use the features ```mdns-async-io``` or ```mdns-tokio``` for async-io or tokio libraries --- Cargo.toml | 2 +- protocols/mdns/Cargo.toml | 16 ++++---- protocols/mdns/src/behaviour.rs | 15 +++++++- protocols/mdns/src/behaviour/iface.rs | 39 ++++++++++--------- protocols/mdns/src/behaviour/socket.rs | 53 +++++++++++++++++--------- protocols/mdns/src/behaviour/timer.rs | 33 ++++++++++++---- 6 files changed, 106 insertions(+), 52 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b5221484e17..7249e70cb4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -161,7 +161,7 @@ required-features = ["floodsub"] [[example]] name = "chat-tokio" -required-features = ["tcp-tokio", "mdns"] +required-features = ["tcp-tokio", "mdns-tokio"] [[example]] name = "file-sharing" diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index 168df699d32..a754d7db2b1 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -24,26 +24,26 @@ smallvec = "1.6.1" socket2 = { version = "0.4.0", features = ["all"] } void = "1.0.2" -async-io = { version = "1.3.1", optional = true } -tokio = { version = "1.19", default-features = false, features = ["net", "time"], optional = true} +async-io-crate = { package = "async-io", version = "1.3.1", optional = true } +tokio-crate = { package = "tokio", version = "1.19", default-features = false, features = ["net", "time"], optional = true} [features] -default = [] -tokio = ["dep:tokio"] -async-io = ["dep:async-io"] +default = ["async-io"] +tokio = ["tokio-crate"] +async-io = ["async-io-crate"] [dev-dependencies] async-std = { version = "1.9.0", features = ["attributes"] } env_logger = "0.9.0" libp2p = { path = "../..", default-features = false, features = ["mdns-async-io", "tcp-async-io", "dns-async-std", "tcp-tokio", "dns-tokio", "websocket", "noise", "mplex", "yamux"] } -tokio = { version = "1.19", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] } +tokio-crate = {package = "tokio", version = "1.19", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] } [[test]] -name = "with-async-std" +name = "use-async-std" required-features = ["async-io"] [[test]] -name = "with-tokio" +name = "use-tokio" required-features = ["tokio"] diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 349ea1550e9..4f8476a61b9 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -23,7 +23,8 @@ mod socket; mod timer; use self::iface::InterfaceState; -use self::timer::{time::AsyncTimer, TimerBuilder}; + +use crate::behaviour::timer::TimerBuilder; use crate::MdnsConfig; use futures::prelude::*; @@ -38,6 +39,12 @@ use smallvec::SmallVec; use std::collections::hash_map::{Entry, HashMap}; use std::{cmp, fmt, io, net::IpAddr, pin::Pin, task::Context, task::Poll, time::Instant}; +#[cfg(feature = "async-io")] +use crate::behaviour::{socket::asio::AsyncUdpSocket, timer::asio::AsyncTimer}; + +#[cfg(feature = "tokio")] +use crate::behaviour::{socket::tokio::TokioUdpSocket, timer::tokio::AsyncTimer}; + /// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds /// them to the topology. #[derive(Debug)] @@ -49,7 +56,11 @@ pub struct Mdns { if_watch: IfWatcher, /// Mdns interface states. - iface_states: HashMap, + #[cfg(feature = "async-io")] + iface_states: HashMap>, + + #[cfg(feature = "tokio")] + iface_states: HashMap>, /// List of nodes that we have discovered, the address, and when their TTL expires. /// diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index e9efdbb7151..792d60c46a0 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -25,10 +25,7 @@ use self::dns::{build_query, build_query_response, build_service_discovery_respo use self::query::MdnsPacket; use crate::MdnsConfig; -use crate::behaviour::{ - socket::{udp::AsyncUdpSocket, AsyncSocket}, - timer::{time::AsyncTimer, TimerBuilder}, -}; +use crate::behaviour::{socket::AsyncSocket, timer::TimerBuilder}; use libp2p_core::{address_translation, multiaddr::Protocol, Multiaddr, PeerId}; use libp2p_swarm::PollParameters; @@ -45,13 +42,17 @@ use std::{ /// An mDNS instance for a networking interface. To discover all peers when having multiple /// interfaces an [`InterfaceState`] is required for each interface. #[derive(Debug)] -pub struct InterfaceState { +pub struct InterfaceState +where + U: AsyncSocket, + T: TimerBuilder + std::marker::Unpin, +{ /// Address this instance is bound to. addr: IpAddr, /// Receive socket. - recv_socket: AsyncUdpSocket, + recv_socket: U, /// Send socket. - send_socket: AsyncUdpSocket, + send_socket: U, /// Buffer used for receiving data from the main socket. /// RFC6762 discourages packets larger than the interface MTU, but allows sizes of up to 9000 /// bytes, if it can be ensured that all participating devices can handle such large packets. @@ -64,7 +65,7 @@ pub struct InterfaceState { /// Discovery interval. query_interval: Duration, /// Discovery timer. - timeout: AsyncTimer, + timeout: T, /// Multicast address. multicast_addr: IpAddr, /// Discovered addresses. @@ -73,7 +74,11 @@ pub struct InterfaceState { ttl: Duration, } -impl InterfaceState { +impl InterfaceState +where + U: AsyncSocket, + T: TimerBuilder + std::marker::Unpin, +{ /// Builds a new [`InterfaceState`]. pub fn new(addr: IpAddr, config: MdnsConfig) -> io::Result { log::info!("creating instance on iface {}", addr); @@ -87,7 +92,7 @@ impl InterfaceState { socket.set_multicast_loop_v4(true)?; socket.set_multicast_ttl_v4(255)?; socket.join_multicast_v4(&*crate::IPV4_MDNS_MULTICAST_ADDRESS, &addr)?; - AsyncUdpSocket::from_socket(UdpSocket::from(socket))? + U::from_socket(UdpSocket::from(socket))? } IpAddr::V6(_) => { let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(socket2::Protocol::UDP))?; @@ -98,7 +103,7 @@ impl InterfaceState { socket.set_multicast_loop_v6(true)?; // TODO: find interface matching addr. socket.join_multicast_v6(&*crate::IPV6_MDNS_MULTICAST_ADDRESS, 0)?; - AsyncUdpSocket::from_socket(UdpSocket::from(socket))? + U::from_socket(UdpSocket::from(socket))? } }; let bind_addr = match addr { @@ -112,7 +117,7 @@ impl InterfaceState { } }; let std_socket = UdpSocket::bind(bind_addr)?; - let send_socket = AsyncUdpSocket::from_socket(std_socket)?; + let send_socket = U::from_socket(std_socket)?; // randomize timer to prevent all converging and firing at the same time. let query_interval = { @@ -133,18 +138,18 @@ impl InterfaceState { send_buffer: Default::default(), discovered: Default::default(), query_interval, - timeout: AsyncTimer::interval_at(Instant::now(), query_interval), + timeout: T::interval_at(Instant::now(), query_interval), multicast_addr, ttl: config.ttl, }) } pub fn reset_timer(&mut self) { - self.timeout = AsyncTimer::interval(self.query_interval); + self.timeout = T::interval(self.query_interval); } pub fn fire_timer(&mut self) { - self.timeout = AsyncTimer::interval_at(Instant::now(), self.query_interval); + self.timeout = T::interval_at(Instant::now(), self.query_interval); } fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) { @@ -214,7 +219,7 @@ impl InterfaceState { // Send responses. let mut s_buffer = VecDeque::new(); - while let Some(packet) = self.send_buffer.pop_front() { + while let Some(packet) = self.send_buffer.pop_front() { match self.send_socket.poll_send_packet( cx, &packet, @@ -224,7 +229,7 @@ impl InterfaceState { Poll::Pending => s_buffer.push_front(packet), } } - + if !s_buffer.is_empty() { self.send_buffer = s_buffer; } diff --git a/protocols/mdns/src/behaviour/socket.rs b/protocols/mdns/src/behaviour/socket.rs index cda79c18693..e19d804c80d 100644 --- a/protocols/mdns/src/behaviour/socket.rs +++ b/protocols/mdns/src/behaviour/socket.rs @@ -23,11 +23,17 @@ use std::{ task::{Context, Poll}, }; -pub(crate) trait AsyncSocket { - type Socket; - - fn from_socket(socket: UdpSocket) -> std::io::Result; - +/// Interface that must be implemented by the different runtimes to use the UdpSocket in async mode +/// +pub trait AsyncSocket { + /// Create the async socket from the ```std::net::UdpSocket``` + /// + fn from_socket(socket: UdpSocket) -> std::io::Result + where + Self: Sized; + + /// Attempts to receive a single packet on the socket from the remote address to which it is connected. + /// fn poll_receive_packet( &mut self, _cx: &mut Context, @@ -36,26 +42,32 @@ pub(crate) trait AsyncSocket { Poll::Pending } + /// Attempts to send data on the socket to a given address. + /// fn poll_send_packet(&mut self, _cx: &mut Context, _packet: &[u8], _to: SocketAddr) -> Poll<()> { Poll::Pending } } #[cfg(feature = "async-io")] -pub(crate) mod udp { +pub mod asio { use super::*; - use async_io::Async; + use async_io_crate::Async; use futures::FutureExt; + /// AsyncIo UdpSocket + /// pub type AsyncUdpSocket = Async; impl AsyncSocket for AsyncUdpSocket { - type Socket = Self; - - fn from_socket(socket: UdpSocket) -> std::io::Result { + /// Create the async socket from the ```std::net::UdpSocket``` + /// + fn from_socket(socket: UdpSocket) -> std::io::Result { Async::new(socket) } + /// Attempts to receive a single packet on the socket from the remote address to which it is connected. + /// fn poll_receive_packet( &mut self, cx: &mut Context, @@ -79,6 +91,8 @@ pub(crate) mod udp { Poll::Pending } + /// Attempts to send data on the socket to a given address. + /// fn poll_send_packet( &mut self, cx: &mut Context, @@ -107,20 +121,23 @@ pub(crate) mod udp { } #[cfg(feature = "tokio")] -pub(crate) mod udp { +pub mod tokio { use super::*; - use tokio::net::UdpSocket as TokioUdpSocket; + use tokio_crate::net::UdpSocket as TkUdpSocket; - pub type AsyncUdpSocket = TokioUdpSocket; - - impl AsyncSocket for AsyncUdpSocket { - type Socket = Self; + /// Tokio ASync Socket` + pub type TokioUdpSocket = TkUdpSocket; - fn from_socket(socket: UdpSocket) -> std::io::Result { + impl AsyncSocket for TokioUdpSocket { + /// Create the async socket from the ```std::net::UdpSocket``` + /// + fn from_socket(socket: UdpSocket) -> std::io::Result { socket.set_nonblocking(true)?; TokioUdpSocket::from_std(socket) } + /// Attempts to receive a single packet on the socket from the remote address to which it is connected. + /// fn poll_receive_packet( &mut self, cx: &mut Context, @@ -149,6 +166,8 @@ pub(crate) mod udp { Poll::Pending } + /// Attempts to send data on the socket to a given address. + /// fn poll_send_packet( &mut self, cx: &mut Context, diff --git a/protocols/mdns/src/behaviour/timer.rs b/protocols/mdns/src/behaviour/timer.rs index 7765923e973..aeb7ff8074f 100644 --- a/protocols/mdns/src/behaviour/timer.rs +++ b/protocols/mdns/src/behaviour/timer.rs @@ -21,53 +21,66 @@ use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +/// Simple wrapper for the differents type of timers +/// #[derive(Debug)] -pub(crate) struct WrapTimer { +pub struct WrapTimer { timer: T, } +/// Builder interface to homogenize the differents implementations +/// pub trait TimerBuilder { type Item; + /// Creates a timer that emits an event once at the given time instant. fn at(instant: Instant) -> Self; + /// Creates a timer that emits events periodically. fn interval(duration: Duration) -> Self; + /// Creates a timer that emits events periodically, starting at start. fn interval_at(start: Instant, duration: Duration) -> Self; + /// Poll the timer fn poll_tick(&mut self, cx: &mut Context) -> Poll; } #[cfg(feature = "async-io")] -pub(crate) mod time { +pub mod asio { use super::*; - use async_io::Timer; + use async_io_crate::Timer; use futures::Stream; use std::pin::Pin; - pub(crate) type AsyncTimer = WrapTimer; + /// Async Timer + pub type AsyncTimer = WrapTimer; impl TimerBuilder for WrapTimer { type Item = Option; + /// Creates a timer that emits an event once at the given time instant. fn at(instant: Instant) -> Self { WrapTimer { timer: Timer::at(instant), } } + /// Creates a timer that emits events periodically. fn interval(duration: Duration) -> Self { WrapTimer { timer: Timer::interval(duration), } } + /// Creates a timer that emits events periodically, starting at start. fn interval_at(start: Instant, duration: Duration) -> Self { WrapTimer { timer: Timer::interval_at(start, duration), } } + /// Poll the timer fn poll_tick(&mut self, cx: &mut Context) -> Poll { Pin::new(&mut self.timer).poll_next(cx) } @@ -75,16 +88,19 @@ pub(crate) mod time { } #[cfg(feature = "tokio")] -pub(crate) mod time { +pub mod tokio { use super::*; - use tokio::time::{self, Instant as TokioInstant, Interval}; + use tokio_crate::time::{self, Instant as TokioInstant, Interval}; - pub(crate) type AsyncTimer = WrapTimer; + /// Tokio wrapper + pub type AsyncTimer = WrapTimer; impl TimerBuilder for WrapTimer { type Item = time::Instant; + /// Creates a timer that emits an event once at the given time instant. fn at(instant: Instant) -> Self { + // Taken from: https://docs.rs/async-io/1.7.0/src/async_io/lib.rs.html#91 let timer = time::interval_at( TokioInstant::from_std(instant), Duration::new(std::u64::MAX, 1_000_000_000 - 1), @@ -92,18 +108,21 @@ pub(crate) mod time { WrapTimer { timer } } + /// Creates a timer that emits events periodically. fn interval(duration: Duration) -> Self { WrapTimer { timer: time::interval(duration), } } + /// Creates a timer that emits events periodically, starting at start. fn interval_at(start: Instant, duration: Duration) -> Self { WrapTimer { timer: time::interval_at(TokioInstant::from_std(start), duration), } } + /// Poll the timer fn poll_tick(&mut self, cx: &mut Context) -> Poll { self.timer.poll_tick(cx) } From 084a6654219a7bd85649da2918d20caf9cee8e7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Thu, 7 Jul 2022 12:23:28 -0400 Subject: [PATCH 06/30] fix(examples/chat-tokio):update mdns-tokio feature --- examples/chat-tokio.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index 0bd44bdabdc..f720db9a317 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -33,7 +33,7 @@ //! ```sh //!cargo run --example chat-tokio \\ //! --no-default-features \\ -//! --features="floodsub mplex noise tcp-tokio mdns" +//! --features="floodsub mplex noise tcp-tokio mdns-tokio" //! ``` use futures::StreamExt; From 4ee1c701d49ecbcebb575665e1ffe5830deb0e97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Thu, 7 Jul 2022 12:31:29 -0400 Subject: [PATCH 07/30] chore(protocols/mdns): update changelog --- protocols/mdns/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index a6b05044fa5..7a43905ec5d 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -9,6 +9,8 @@ - Update to `libp2p-core` `v0.35.0`. +- fix(protocols/mdns): fix high cpu usage with tokio library. + # 0.38.0 - Update to `libp2p-core` `v0.34.0`. From 1f19654b4f4bc92a314e239a38fa9df19310ed1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Sat, 9 Jul 2022 00:45:09 -0400 Subject: [PATCH 08/30] fix(protocols/mdns):permit build with all features **Breaking Changes** After this change must be used `Mdns` or `TokioMdns`, depending on the enabled features --- examples/chat-tokio.rs | 10 ++++-- protocols/mdns/src/behaviour.rs | 42 ++++++++++++++++++-------- protocols/mdns/src/behaviour/iface.rs | 4 +-- protocols/mdns/src/behaviour/socket.rs | 13 +------- protocols/mdns/src/behaviour/timer.rs | 13 ++++---- protocols/mdns/src/lib.rs | 12 ++++++-- protocols/mdns/tests/use-tokio.rs | 6 ++-- 7 files changed, 58 insertions(+), 42 deletions(-) diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index f720db9a317..111aeb6a6e8 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -41,7 +41,11 @@ use libp2p::{ core::upgrade, floodsub::{self, Floodsub, FloodsubEvent}, identity, - mdns::{Mdns, MdnsEvent}, + mdns::{ + // `MdnsTokio` is available through the `mdns-tokio` feature. + TokioMdns, + MdnsEvent + }, mplex, noise, swarm::{SwarmBuilder, SwarmEvent}, @@ -88,7 +92,7 @@ async fn main() -> Result<(), Box> { #[behaviour(out_event = "MyBehaviourEvent")] struct MyBehaviour { floodsub: Floodsub, - mdns: Mdns, + mdns: TokioMdns, } #[allow(clippy::large_enum_variant)] @@ -111,7 +115,7 @@ async fn main() -> Result<(), Box> { // Create a Swarm to manage peers and events. let mut swarm = { - let mdns = Mdns::new(Default::default()).await?; + let mdns = TokioMdns::new(Default::default()).await?; let mut behaviour = MyBehaviour { floodsub: Floodsub::new(peer_id), mdns, diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 4f8476a61b9..410e3d61337 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -24,7 +24,7 @@ mod timer; use self::iface::InterfaceState; -use crate::behaviour::timer::TimerBuilder; +use crate::behaviour::{socket::AsyncSocket, timer::TimerBuilder}; use crate::MdnsConfig; use futures::prelude::*; @@ -42,13 +42,25 @@ use std::{cmp, fmt, io, net::IpAddr, pin::Pin, task::Context, task::Poll, time:: #[cfg(feature = "async-io")] use crate::behaviour::{socket::asio::AsyncUdpSocket, timer::asio::AsyncTimer}; +/// The type of a [`GenMdns`] using the `async-io` implementation. +#[cfg(feature = "async-io")] +pub type Mdns = GenMdns; + #[cfg(feature = "tokio")] -use crate::behaviour::{socket::tokio::TokioUdpSocket, timer::tokio::AsyncTimer}; +use crate::behaviour::{socket::tokio::TokioUdpSocket, timer::tokio::TokioTimer}; + +/// The type of a [`GenMdns`] using the `tokio` implementation. +#[cfg(feature = "tokio")] +pub type TokioMdns = GenMdns; /// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds /// them to the topology. #[derive(Debug)] -pub struct Mdns { +pub struct GenMdns +where + T: TimerBuilder, + S: AsyncSocket, +{ /// InterfaceState config. config: MdnsConfig, @@ -56,11 +68,7 @@ pub struct Mdns { if_watch: IfWatcher, /// Mdns interface states. - #[cfg(feature = "async-io")] - iface_states: HashMap>, - - #[cfg(feature = "tokio")] - iface_states: HashMap>, + iface_states: HashMap>, /// List of nodes that we have discovered, the address, and when their TTL expires. /// @@ -71,10 +79,14 @@ pub struct Mdns { /// Future that fires when the TTL of at least one node in `discovered_nodes` expires. /// /// `None` if `discovered_nodes` is empty. - closest_expiration: Option, + closest_expiration: Option, } -impl Mdns { +impl GenMdns +where + T: TimerBuilder, + S: AsyncSocket, +{ /// Builds a new `Mdns` behaviour. pub async fn new(config: MdnsConfig) -> io::Result { let if_watch = if_watch::IfWatcher::new().await?; @@ -105,11 +117,15 @@ impl Mdns { *expires = now; } } - self.closest_expiration = Some(AsyncTimer::at(now)); + self.closest_expiration = Some(T::at(now)); } } -impl NetworkBehaviour for Mdns { +impl NetworkBehaviour for GenMdns +where + T: TimerBuilder, + S: AsyncSocket, +{ type ConnectionHandler = DummyConnectionHandler; type OutEvent = MdnsEvent; @@ -233,7 +249,7 @@ impl NetworkBehaviour for Mdns { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } if let Some(closest_expiration) = closest_expiration { - let mut timer = AsyncTimer::at(closest_expiration); + let mut timer = T::at(closest_expiration); let _ = Pin::new(&mut timer).poll_tick(cx); self.closest_expiration = Some(timer); diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index 792d60c46a0..3c73f8e3546 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -45,7 +45,7 @@ use std::{ pub struct InterfaceState where U: AsyncSocket, - T: TimerBuilder + std::marker::Unpin, + T: TimerBuilder, { /// Address this instance is bound to. addr: IpAddr, @@ -77,7 +77,7 @@ where impl InterfaceState where U: AsyncSocket, - T: TimerBuilder + std::marker::Unpin, + T: TimerBuilder, { /// Builds a new [`InterfaceState`]. pub fn new(addr: IpAddr, config: MdnsConfig) -> io::Result { diff --git a/protocols/mdns/src/behaviour/socket.rs b/protocols/mdns/src/behaviour/socket.rs index e19d804c80d..929078d25e5 100644 --- a/protocols/mdns/src/behaviour/socket.rs +++ b/protocols/mdns/src/behaviour/socket.rs @@ -24,16 +24,13 @@ use std::{ }; /// Interface that must be implemented by the different runtimes to use the UdpSocket in async mode -/// -pub trait AsyncSocket { +pub trait AsyncSocket: Send + 'static { /// Create the async socket from the ```std::net::UdpSocket``` - /// fn from_socket(socket: UdpSocket) -> std::io::Result where Self: Sized; /// Attempts to receive a single packet on the socket from the remote address to which it is connected. - /// fn poll_receive_packet( &mut self, _cx: &mut Context, @@ -43,7 +40,6 @@ pub trait AsyncSocket { } /// Attempts to send data on the socket to a given address. - /// fn poll_send_packet(&mut self, _cx: &mut Context, _packet: &[u8], _to: SocketAddr) -> Poll<()> { Poll::Pending } @@ -56,18 +52,15 @@ pub mod asio { use futures::FutureExt; /// AsyncIo UdpSocket - /// pub type AsyncUdpSocket = Async; impl AsyncSocket for AsyncUdpSocket { /// Create the async socket from the ```std::net::UdpSocket``` - /// fn from_socket(socket: UdpSocket) -> std::io::Result { Async::new(socket) } /// Attempts to receive a single packet on the socket from the remote address to which it is connected. - /// fn poll_receive_packet( &mut self, cx: &mut Context, @@ -92,7 +85,6 @@ pub mod asio { } /// Attempts to send data on the socket to a given address. - /// fn poll_send_packet( &mut self, cx: &mut Context, @@ -130,14 +122,12 @@ pub mod tokio { impl AsyncSocket for TokioUdpSocket { /// Create the async socket from the ```std::net::UdpSocket``` - /// fn from_socket(socket: UdpSocket) -> std::io::Result { socket.set_nonblocking(true)?; TokioUdpSocket::from_std(socket) } /// Attempts to receive a single packet on the socket from the remote address to which it is connected. - /// fn poll_receive_packet( &mut self, cx: &mut Context, @@ -167,7 +157,6 @@ pub mod tokio { } /// Attempts to send data on the socket to a given address. - /// fn poll_send_packet( &mut self, cx: &mut Context, diff --git a/protocols/mdns/src/behaviour/timer.rs b/protocols/mdns/src/behaviour/timer.rs index aeb7ff8074f..9d327e1af4a 100644 --- a/protocols/mdns/src/behaviour/timer.rs +++ b/protocols/mdns/src/behaviour/timer.rs @@ -18,19 +18,20 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; +use std::{ + marker::Unpin, + task::{Context, Poll}, + time::{Duration, Instant}, +}; /// Simple wrapper for the differents type of timers -/// #[derive(Debug)] pub struct WrapTimer { timer: T, } /// Builder interface to homogenize the differents implementations -/// -pub trait TimerBuilder { +pub trait TimerBuilder: Send + Unpin + 'static { type Item; /// Creates a timer that emits an event once at the given time instant. @@ -93,7 +94,7 @@ pub mod tokio { use tokio_crate::time::{self, Instant as TokioInstant, Interval}; /// Tokio wrapper - pub type AsyncTimer = WrapTimer; + pub type TokioTimer = WrapTimer; impl TimerBuilder for WrapTimer { type Item = time::Instant; diff --git a/protocols/mdns/src/lib.rs b/protocols/mdns/src/lib.rs index a99eab691a2..3b484c91daa 100644 --- a/protocols/mdns/src/lib.rs +++ b/protocols/mdns/src/lib.rs @@ -26,16 +26,22 @@ //! //! # Usage //! -//! This crate provides the `Mdns` struct which implements the `NetworkBehaviour` trait. This -//! struct will automatically discover other libp2p nodes on the local network. +//! This crate provides a `Mdns` and `TokioMdns`, depending on the enabled features, which +//! implements the `NetworkBehaviour` trait. This struct will automatically discover other +//! libp2p nodes on the local network. //! use lazy_static::lazy_static; use std::net::{Ipv4Addr, Ipv6Addr}; use std::time::Duration; mod behaviour; +pub use crate::behaviour::{GenMdns, MdnsEvent}; -pub use crate::behaviour::{Mdns, MdnsEvent}; +#[cfg(feature = "async-io")] +pub use crate::behaviour::Mdns; + +#[cfg(feature = "tokio")] +pub use crate::behaviour::TokioMdns; /// The DNS service name for all libp2p peers used to query for addresses. const SERVICE_NAME: &[u8] = b"_p2p._udp.local"; diff --git a/protocols/mdns/tests/use-tokio.rs b/protocols/mdns/tests/use-tokio.rs index 28fd2b95d78..cdc984a3597 100644 --- a/protocols/mdns/tests/use-tokio.rs +++ b/protocols/mdns/tests/use-tokio.rs @@ -22,18 +22,18 @@ extern crate tokio_crate as tokio; use futures::StreamExt; use libp2p::{ identity, - mdns::{Mdns, MdnsConfig, MdnsEvent}, + mdns::{MdnsConfig, MdnsEvent, TokioMdns}, swarm::{Swarm, SwarmEvent}, PeerId, }; use std::error::Error; use std::time::Duration; -async fn create_swarm(config: MdnsConfig) -> Result, Box> { +async fn create_swarm(config: MdnsConfig) -> Result, Box> { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(id_keys.public()); let transport = libp2p::tokio_development_transport(id_keys)?; - let behaviour = Mdns::new(config).await?; + let behaviour = TokioMdns::new(config).await?; let mut swarm = Swarm::new(transport, behaviour, peer_id); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; Ok(swarm) From 328a18671619df3f842247089dff5b38168476bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Sat, 9 Jul 2022 01:06:03 -0400 Subject: [PATCH 09/30] chore(protocols/mdns): update changelog --- protocols/mdns/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 7a43905ec5d..c441d248814 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -9,7 +9,7 @@ - Update to `libp2p-core` `v0.35.0`. -- fix(protocols/mdns): fix high cpu usage with tokio library. +- Fix high CPU usage with Tokio library (see [PR 2748]). # 0.38.0 From 1898a321bb7b61dad101485623e57fc2a1143e74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Thu, 14 Jul 2022 00:36:07 -0400 Subject: [PATCH 10/30] style(chat-tokio): fix formating error --- examples/chat-tokio.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index 111aeb6a6e8..aa26ab76076 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -42,9 +42,9 @@ use libp2p::{ floodsub::{self, Floodsub, FloodsubEvent}, identity, mdns::{ + MdnsEvent, // `MdnsTokio` is available through the `mdns-tokio` feature. - TokioMdns, - MdnsEvent + TokioMdns, }, mplex, noise, From 57bd2db7ce25249e01a73c6f18920e5ee2b60d3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Thu, 14 Jul 2022 00:40:40 -0400 Subject: [PATCH 11/30] chore(protocols/mdns): use deps syntax --- protocols/mdns/Cargo.toml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index a754d7db2b1..10883a82ec0 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -24,19 +24,19 @@ smallvec = "1.6.1" socket2 = { version = "0.4.0", features = ["all"] } void = "1.0.2" -async-io-crate = { package = "async-io", version = "1.3.1", optional = true } -tokio-crate = { package = "tokio", version = "1.19", default-features = false, features = ["net", "time"], optional = true} +async-io = { version = "1.3.1", optional = true } +tokio = { version = "1.19", default-features = false, features = ["net", "time"], optional = true} [features] default = ["async-io"] -tokio = ["tokio-crate"] -async-io = ["async-io-crate"] +tokio = ["dep:tokio"] +async-io = ["dep:async-io"] [dev-dependencies] async-std = { version = "1.9.0", features = ["attributes"] } env_logger = "0.9.0" libp2p = { path = "../..", default-features = false, features = ["mdns-async-io", "tcp-async-io", "dns-async-std", "tcp-tokio", "dns-tokio", "websocket", "noise", "mplex", "yamux"] } -tokio-crate = {package = "tokio", version = "1.19", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] } +tokio = { version = "1.19", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] } [[test]] From cfb9a9e29609f6feb54e3fea42cdd24f8bebb62b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Thu, 14 Jul 2022 01:01:29 -0400 Subject: [PATCH 12/30] fix(protocols/mdns): clean the code --- protocols/mdns/src/behaviour.rs | 14 +--- protocols/mdns/src/behaviour/iface.rs | 41 +++++----- protocols/mdns/src/behaviour/socket.rs | 109 ++++++++----------------- protocols/mdns/src/behaviour/timer.rs | 32 ++++---- protocols/mdns/tests/use-tokio.rs | 2 - 5 files changed, 75 insertions(+), 123 deletions(-) diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 410e3d61337..310bd805cc2 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -23,11 +23,10 @@ mod socket; mod timer; use self::iface::InterfaceState; - use crate::behaviour::{socket::AsyncSocket, timer::TimerBuilder}; - use crate::MdnsConfig; use futures::prelude::*; +use futures::Stream; use if_watch::{IfEvent, IfWatcher}; use libp2p_core::transport::ListenerId; use libp2p_core::{Multiaddr, PeerId}; @@ -56,11 +55,7 @@ pub type TokioMdns = GenMdns; /// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds /// them to the topology. #[derive(Debug)] -pub struct GenMdns -where - T: TimerBuilder, - S: AsyncSocket, -{ +pub struct GenMdns { /// InterfaceState config. config: MdnsConfig, @@ -85,7 +80,6 @@ where impl GenMdns where T: TimerBuilder, - S: AsyncSocket, { /// Builds a new `Mdns` behaviour. pub async fn new(config: MdnsConfig) -> io::Result { @@ -123,7 +117,7 @@ where impl NetworkBehaviour for GenMdns where - T: TimerBuilder, + T: TimerBuilder + Stream, S: AsyncSocket, { type ConnectionHandler = DummyConnectionHandler; @@ -250,7 +244,7 @@ where } if let Some(closest_expiration) = closest_expiration { let mut timer = T::at(closest_expiration); - let _ = Pin::new(&mut timer).poll_tick(cx); + let _ = Pin::new(&mut timer).poll_next(cx); self.closest_expiration = Some(timer); } diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index 3c73f8e3546..5891c2994f7 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -23,10 +23,8 @@ mod query; use self::dns::{build_query, build_query_response, build_service_discovery_response}; use self::query::MdnsPacket; -use crate::MdnsConfig; - use crate::behaviour::{socket::AsyncSocket, timer::TimerBuilder}; - +use crate::MdnsConfig; use libp2p_core::{address_translation, multiaddr::Protocol, Multiaddr, PeerId}; use libp2p_swarm::PollParameters; use socket2::{Domain, Socket, Type}; @@ -42,11 +40,7 @@ use std::{ /// An mDNS instance for a networking interface. To discover all peers when having multiple /// interfaces an [`InterfaceState`] is required for each interface. #[derive(Debug)] -pub struct InterfaceState -where - U: AsyncSocket, - T: TimerBuilder, -{ +pub struct InterfaceState { /// Address this instance is bound to. addr: IpAddr, /// Receive socket. @@ -77,7 +71,7 @@ where impl InterfaceState where U: AsyncSocket, - T: TimerBuilder, + T: TimerBuilder + futures::Stream, { /// Builds a new [`InterfaceState`]. pub fn new(addr: IpAddr, config: MdnsConfig) -> io::Result { @@ -209,32 +203,41 @@ where .recv_socket .poll_receive_packet(cx, &mut self.recv_buffer) { - Poll::Ready(Some((len, from))) => { + Poll::Ready(Ok(Some((len, from)))) => { if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) { self.inject_mdns_packet(packet, params); } } - Poll::Ready(None) | Poll::Pending => {} + Poll::Ready(Ok(None)) => {} + Poll::Ready(Err(err)) => { + log::error!("failed reading datagram: {}", err); + } + Poll::Pending => {} } // Send responses. - let mut s_buffer = VecDeque::new(); while let Some(packet) = self.send_buffer.pop_front() { match self.send_socket.poll_send_packet( cx, &packet, SocketAddr::new(self.multicast_addr, 5353), ) { - Poll::Ready(_) => log::trace!("sent packet on iface {}", self.addr), - Poll::Pending => s_buffer.push_front(packet), + Poll::Ready(Ok(_)) => { + log::trace!("sent packet on iface {}", self.addr) + } + Poll::Ready(Err(err)) => { + log::error!("error sending packet on iface {} {}", self.addr, err); + self.send_buffer.push_front(packet); + break; + } + Poll::Pending => { + self.send_buffer.push_front(packet); + break; + } } } - if !s_buffer.is_empty() { - self.send_buffer = s_buffer; - } - - if Pin::new(&mut self.timeout).poll_tick(cx).is_ready() { + if Pin::new(&mut self.timeout).poll_next(cx).is_ready() { log::trace!("sending query on iface {}", self.addr); self.send_buffer.push_back(build_query()); } diff --git a/protocols/mdns/src/behaviour/socket.rs b/protocols/mdns/src/behaviour/socket.rs index 929078d25e5..8a288225baf 100644 --- a/protocols/mdns/src/behaviour/socket.rs +++ b/protocols/mdns/src/behaviour/socket.rs @@ -19,6 +19,7 @@ // DEALINGS IN THE SOFTWARE. use std::{ + io::Error, net::{SocketAddr, UdpSocket}, task::{Context, Poll}, }; @@ -35,20 +36,21 @@ pub trait AsyncSocket: Send + 'static { &mut self, _cx: &mut Context, _buf: &mut [u8], - ) -> Poll> { - Poll::Pending - } + ) -> Poll, Error>>; /// Attempts to send data on the socket to a given address. - fn poll_send_packet(&mut self, _cx: &mut Context, _packet: &[u8], _to: SocketAddr) -> Poll<()> { - Poll::Pending - } + fn poll_send_packet( + &mut self, + _cx: &mut Context, + _packet: &[u8], + _to: SocketAddr, + ) -> Poll>; } #[cfg(feature = "async-io")] pub mod asio { use super::*; - use async_io_crate::Async; + use async_io::Async; use futures::FutureExt; /// AsyncIo UdpSocket @@ -65,23 +67,14 @@ pub mod asio { &mut self, cx: &mut Context, buf: &mut [u8], - ) -> Poll> { + ) -> Poll, Error>> { // Poll receive socket. - if self.poll_readable(cx).is_ready() { - match self.recv_from(buf).now_or_never() { - Some(Ok((len, from))) => { - return Poll::Ready(Some((len, from))); - } - Some(Err(err)) => { - log::error!("Failed reading datagram: {}", err); - return Poll::Ready(None); - } - None => { - return Poll::Ready(None); - } - } + let _ = futures::ready!(self.poll_readable(cx)); + match self.recv_from(buf).now_or_never() { + Some(Ok((len, from))) => Poll::Ready(Ok(Some((len, from)))), + Some(Err(err)) => Poll::Ready(Err(err)), + None => Poll::Ready(Ok(None)), } - Poll::Pending } /// Attempts to send data on the socket to a given address. @@ -90,24 +83,13 @@ pub mod asio { cx: &mut Context, packet: &[u8], to: SocketAddr, - ) -> Poll<()> { - if self.poll_writable(cx).is_ready() { - match self.send_to(packet, to).now_or_never() { - Some(Ok(_)) => { - log::trace!("sent packet on iface {}", to); - return Poll::Ready(()); - } - Some(Err(err)) => { - log::error!("error sending packet on iface {}: {}", to, err); - return Poll::Ready(()); - } - None => { - return Poll::Pending; - } - } + ) -> Poll> { + let _ = futures::ready!(self.poll_writable(cx)); + match self.send_to(packet, to).now_or_never() { + Some(Ok(_)) => Poll::Ready(Ok(())), + Some(Err(err)) => Poll::Ready(Err(err)), + None => Poll::Pending, } - - Poll::Pending } } } @@ -115,7 +97,7 @@ pub mod asio { #[cfg(feature = "tokio")] pub mod tokio { use super::*; - use tokio_crate::net::UdpSocket as TkUdpSocket; + use ::tokio::net::UdpSocket as TkUdpSocket; /// Tokio ASync Socket` pub type TokioUdpSocket = TkUdpSocket; @@ -132,28 +114,18 @@ pub mod tokio { &mut self, cx: &mut Context, buf: &mut [u8], - ) -> Poll> { + ) -> Poll, Error>> { match self.poll_recv_ready(cx) { Poll::Ready(Ok(_)) => match self.try_recv_from(buf) { - Ok((len, from)) => { - return Poll::Ready(Some((len, from))); - } + Ok((len, from)) => Poll::Ready(Ok(Some((len, from)))), Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { - return Poll::Ready(None); - } - Err(err) => { - log::error!("Failed reading datagram: {}", err); - return Poll::Ready(None); + Poll::Ready(Ok(None)) } + Err(err) => Poll::Ready(Err(err)), }, - Poll::Ready(Err(e)) => { - log::error!("Failed recv ready datagram: {}", e); - return Poll::Ready(None); - } - _ => {} + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + _ => Poll::Pending, } - - Poll::Pending } /// Attempts to send data on the socket to a given address. @@ -162,29 +134,16 @@ pub mod tokio { cx: &mut Context, packet: &[u8], to: SocketAddr, - ) -> Poll<()> { + ) -> Poll> { match self.poll_send_ready(cx) { Poll::Ready(Ok(_)) => match self.try_send_to(packet, to) { - Ok(_len) => { - log::trace!("sent packet on iface {}", to); - return Poll::Ready(()); - } - Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { - return Poll::Ready(()); - } - Err(err) => { - log::error!("Failed reading datagram: {}", err); - return Poll::Ready(()); - } + Ok(_len) => Poll::Ready(Ok(())), + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Poll::Ready(Ok(())), + Err(err) => Poll::Ready(Err(err)), }, - Poll::Ready(Err(e)) => { - log::error!("Failed recv ready datagram: {}", e); - return Poll::Ready(()); - } - _ => {} + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + _ => Poll::Pending, } - - Poll::Pending } } } diff --git a/protocols/mdns/src/behaviour/timer.rs b/protocols/mdns/src/behaviour/timer.rs index 9d327e1af4a..8bf71194c99 100644 --- a/protocols/mdns/src/behaviour/timer.rs +++ b/protocols/mdns/src/behaviour/timer.rs @@ -20,6 +20,7 @@ use std::{ marker::Unpin, + pin::Pin, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -32,8 +33,6 @@ pub struct WrapTimer { /// Builder interface to homogenize the differents implementations pub trait TimerBuilder: Send + Unpin + 'static { - type Item; - /// Creates a timer that emits an event once at the given time instant. fn at(instant: Instant) -> Self; @@ -42,24 +41,18 @@ pub trait TimerBuilder: Send + Unpin + 'static { /// Creates a timer that emits events periodically, starting at start. fn interval_at(start: Instant, duration: Duration) -> Self; - - /// Poll the timer - fn poll_tick(&mut self, cx: &mut Context) -> Poll; } #[cfg(feature = "async-io")] pub mod asio { use super::*; - use async_io_crate::Timer; + use async_io::Timer; use futures::Stream; - use std::pin::Pin; /// Async Timer pub type AsyncTimer = WrapTimer; impl TimerBuilder for WrapTimer { - type Item = Option; - /// Creates a timer that emits an event once at the given time instant. fn at(instant: Instant) -> Self { WrapTimer { @@ -80,9 +73,12 @@ pub mod asio { timer: Timer::interval_at(start, duration), } } + } + + impl Stream for AsyncTimer { + type Item = Instant; - /// Poll the timer - fn poll_tick(&mut self, cx: &mut Context) -> Poll { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.timer).poll_next(cx) } } @@ -91,14 +87,13 @@ pub mod asio { #[cfg(feature = "tokio")] pub mod tokio { use super::*; - use tokio_crate::time::{self, Instant as TokioInstant, Interval}; + use ::tokio::time::{self, Instant as TokioInstant, Interval}; + use futures::Stream; /// Tokio wrapper pub type TokioTimer = WrapTimer; impl TimerBuilder for WrapTimer { - type Item = time::Instant; - /// Creates a timer that emits an event once at the given time instant. fn at(instant: Instant) -> Self { // Taken from: https://docs.rs/async-io/1.7.0/src/async_io/lib.rs.html#91 @@ -122,10 +117,13 @@ pub mod tokio { timer: time::interval_at(TokioInstant::from_std(start), duration), } } + } + + impl Stream for TokioTimer { + type Item = TokioInstant; - /// Poll the timer - fn poll_tick(&mut self, cx: &mut Context) -> Poll { - self.timer.poll_tick(cx) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.timer.poll_tick(cx).map(Some) } } } diff --git a/protocols/mdns/tests/use-tokio.rs b/protocols/mdns/tests/use-tokio.rs index cdc984a3597..c867a30a47b 100644 --- a/protocols/mdns/tests/use-tokio.rs +++ b/protocols/mdns/tests/use-tokio.rs @@ -17,8 +17,6 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE.use futures::StreamExt; -extern crate tokio_crate as tokio; - use futures::StreamExt; use libp2p::{ identity, From c57ee57e733908a20d0a6618bb4b8eea4017e4ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Thu, 14 Jul 2022 22:29:25 -0400 Subject: [PATCH 13/30] fix(examples/chat-tokio): fix typo --- examples/chat-tokio.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index aa26ab76076..35c8eddb79e 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -43,7 +43,7 @@ use libp2p::{ identity, mdns::{ MdnsEvent, - // `MdnsTokio` is available through the `mdns-tokio` feature. + // `TokioMdns` is available through the `mdns-tokio` feature. TokioMdns, }, mplex, From f20dbbe7252976625788e2d17009aed365067dbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Thu, 14 Jul 2022 22:33:39 -0400 Subject: [PATCH 14/30] chore(protocols/mdns): add pr link to changelog --- protocols/mdns/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index c441d248814..576c2a081b2 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -9,7 +9,7 @@ - Update to `libp2p-core` `v0.35.0`. -- Fix high CPU usage with Tokio library (see [PR 2748]). +- Fix high CPU usage with Tokio library (see [PR 2748](https://github.com/libp2p/rust-libp2p/pull/2748) ). # 0.38.0 From 6e1c8e5fb699b9d8c26ed5382e4f032f136bc4a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Sat, 16 Jul 2022 23:44:38 -0400 Subject: [PATCH 15/30] chore(protocols/mdns): fix changelog --- protocols/mdns/CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 576c2a081b2..e33a81509e5 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -9,7 +9,9 @@ - Update to `libp2p-core` `v0.35.0`. -- Fix high CPU usage with Tokio library (see [PR 2748](https://github.com/libp2p/rust-libp2p/pull/2748) ). +- Fix high CPU usage with Tokio library (see [PR 2748]). + +[PR 2748]: https://github.com/libp2p/rust-libp2p/pull/2748 # 0.38.0 From beeb1a15d8e85e4a5b8574b3bf062c24311e9023 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Sun, 17 Jul 2022 16:46:01 -0400 Subject: [PATCH 16/30] fix(protocols/mdns): rename socket functions --- protocols/mdns/src/behaviour/iface.rs | 10 +++++----- protocols/mdns/src/behaviour/socket.rs | 24 +++++++++--------------- 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index 5891c2994f7..33937e3378e 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -86,7 +86,7 @@ where socket.set_multicast_loop_v4(true)?; socket.set_multicast_ttl_v4(255)?; socket.join_multicast_v4(&*crate::IPV4_MDNS_MULTICAST_ADDRESS, &addr)?; - U::from_socket(UdpSocket::from(socket))? + U::from_std(UdpSocket::from(socket))? } IpAddr::V6(_) => { let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(socket2::Protocol::UDP))?; @@ -97,7 +97,7 @@ where socket.set_multicast_loop_v6(true)?; // TODO: find interface matching addr. socket.join_multicast_v6(&*crate::IPV6_MDNS_MULTICAST_ADDRESS, 0)?; - U::from_socket(UdpSocket::from(socket))? + U::from_std(UdpSocket::from(socket))? } }; let bind_addr = match addr { @@ -111,7 +111,7 @@ where } }; let std_socket = UdpSocket::bind(bind_addr)?; - let send_socket = U::from_socket(std_socket)?; + let send_socket = U::from_std(std_socket)?; // randomize timer to prevent all converging and firing at the same time. let query_interval = { @@ -201,7 +201,7 @@ where // Poll receive socket. match self .recv_socket - .poll_receive_packet(cx, &mut self.recv_buffer) + .poll_read(cx, &mut self.recv_buffer) { Poll::Ready(Ok(Some((len, from)))) => { if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) { @@ -217,7 +217,7 @@ where // Send responses. while let Some(packet) = self.send_buffer.pop_front() { - match self.send_socket.poll_send_packet( + match self.send_socket.poll_write( cx, &packet, SocketAddr::new(self.multicast_addr, 5353), diff --git a/protocols/mdns/src/behaviour/socket.rs b/protocols/mdns/src/behaviour/socket.rs index 8a288225baf..c4c5d0fc856 100644 --- a/protocols/mdns/src/behaviour/socket.rs +++ b/protocols/mdns/src/behaviour/socket.rs @@ -27,19 +27,19 @@ use std::{ /// Interface that must be implemented by the different runtimes to use the UdpSocket in async mode pub trait AsyncSocket: Send + 'static { /// Create the async socket from the ```std::net::UdpSocket``` - fn from_socket(socket: UdpSocket) -> std::io::Result + fn from_std(socket: UdpSocket) -> std::io::Result where Self: Sized; /// Attempts to receive a single packet on the socket from the remote address to which it is connected. - fn poll_receive_packet( + fn poll_read( &mut self, _cx: &mut Context, _buf: &mut [u8], ) -> Poll, Error>>; /// Attempts to send data on the socket to a given address. - fn poll_send_packet( + fn poll_write( &mut self, _cx: &mut Context, _packet: &[u8], @@ -57,13 +57,11 @@ pub mod asio { pub type AsyncUdpSocket = Async; impl AsyncSocket for AsyncUdpSocket { - /// Create the async socket from the ```std::net::UdpSocket``` - fn from_socket(socket: UdpSocket) -> std::io::Result { + fn from_std(socket: UdpSocket) -> std::io::Result { Async::new(socket) } - /// Attempts to receive a single packet on the socket from the remote address to which it is connected. - fn poll_receive_packet( + fn poll_read( &mut self, cx: &mut Context, buf: &mut [u8], @@ -77,8 +75,7 @@ pub mod asio { } } - /// Attempts to send data on the socket to a given address. - fn poll_send_packet( + fn poll_write( &mut self, cx: &mut Context, packet: &[u8], @@ -103,14 +100,12 @@ pub mod tokio { pub type TokioUdpSocket = TkUdpSocket; impl AsyncSocket for TokioUdpSocket { - /// Create the async socket from the ```std::net::UdpSocket``` - fn from_socket(socket: UdpSocket) -> std::io::Result { + fn from_std(socket: UdpSocket) -> std::io::Result { socket.set_nonblocking(true)?; TokioUdpSocket::from_std(socket) } - /// Attempts to receive a single packet on the socket from the remote address to which it is connected. - fn poll_receive_packet( + fn poll_read( &mut self, cx: &mut Context, buf: &mut [u8], @@ -128,8 +123,7 @@ pub mod tokio { } } - /// Attempts to send data on the socket to a given address. - fn poll_send_packet( + fn poll_write( &mut self, cx: &mut Context, packet: &[u8], From 40c84859b62f9ec2ccccc405fe957d23e5deee11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Sun, 17 Jul 2022 17:22:50 -0400 Subject: [PATCH 17/30] fix(protocols/mdns): rename TimerBuilder trait --- protocols/mdns/src/behaviour.rs | 6 +++--- protocols/mdns/src/behaviour/iface.rs | 9 +++------ protocols/mdns/src/behaviour/timer.rs | 14 ++++---------- 3 files changed, 10 insertions(+), 19 deletions(-) diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 310bd805cc2..854bd885a22 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -23,7 +23,7 @@ mod socket; mod timer; use self::iface::InterfaceState; -use crate::behaviour::{socket::AsyncSocket, timer::TimerBuilder}; +use crate::behaviour::{socket::AsyncSocket, timer::Builder}; use crate::MdnsConfig; use futures::prelude::*; use futures::Stream; @@ -79,7 +79,7 @@ pub struct GenMdns { impl GenMdns where - T: TimerBuilder, + T: Builder, { /// Builds a new `Mdns` behaviour. pub async fn new(config: MdnsConfig) -> io::Result { @@ -117,7 +117,7 @@ where impl NetworkBehaviour for GenMdns where - T: TimerBuilder + Stream, + T: Builder + Stream, S: AsyncSocket, { type ConnectionHandler = DummyConnectionHandler; diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index 33937e3378e..607d5de1001 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -23,7 +23,7 @@ mod query; use self::dns::{build_query, build_query_response, build_service_discovery_response}; use self::query::MdnsPacket; -use crate::behaviour::{socket::AsyncSocket, timer::TimerBuilder}; +use crate::behaviour::{socket::AsyncSocket, timer::Builder}; use crate::MdnsConfig; use libp2p_core::{address_translation, multiaddr::Protocol, Multiaddr, PeerId}; use libp2p_swarm::PollParameters; @@ -71,7 +71,7 @@ pub struct InterfaceState { impl InterfaceState where U: AsyncSocket, - T: TimerBuilder + futures::Stream, + T: Builder + futures::Stream, { /// Builds a new [`InterfaceState`]. pub fn new(addr: IpAddr, config: MdnsConfig) -> io::Result { @@ -199,10 +199,7 @@ where params: &impl PollParameters, ) -> Option<(PeerId, Multiaddr, Instant)> { // Poll receive socket. - match self - .recv_socket - .poll_read(cx, &mut self.recv_buffer) - { + match self.recv_socket.poll_read(cx, &mut self.recv_buffer) { Poll::Ready(Ok(Some((len, from)))) => { if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) { self.inject_mdns_packet(packet, params); diff --git a/protocols/mdns/src/behaviour/timer.rs b/protocols/mdns/src/behaviour/timer.rs index 8bf71194c99..4356b8f860b 100644 --- a/protocols/mdns/src/behaviour/timer.rs +++ b/protocols/mdns/src/behaviour/timer.rs @@ -32,7 +32,7 @@ pub struct WrapTimer { } /// Builder interface to homogenize the differents implementations -pub trait TimerBuilder: Send + Unpin + 'static { +pub trait Builder: Send + Unpin + 'static { /// Creates a timer that emits an event once at the given time instant. fn at(instant: Instant) -> Self; @@ -52,22 +52,19 @@ pub mod asio { /// Async Timer pub type AsyncTimer = WrapTimer; - impl TimerBuilder for WrapTimer { - /// Creates a timer that emits an event once at the given time instant. + impl Builder for WrapTimer { fn at(instant: Instant) -> Self { WrapTimer { timer: Timer::at(instant), } } - /// Creates a timer that emits events periodically. fn interval(duration: Duration) -> Self { WrapTimer { timer: Timer::interval(duration), } } - /// Creates a timer that emits events periodically, starting at start. fn interval_at(start: Instant, duration: Duration) -> Self { WrapTimer { timer: Timer::interval_at(start, duration), @@ -78,7 +75,7 @@ pub mod asio { impl Stream for AsyncTimer { type Item = Instant; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.timer).poll_next(cx) } } @@ -93,8 +90,7 @@ pub mod tokio { /// Tokio wrapper pub type TokioTimer = WrapTimer; - impl TimerBuilder for WrapTimer { - /// Creates a timer that emits an event once at the given time instant. + impl Builder for WrapTimer { fn at(instant: Instant) -> Self { // Taken from: https://docs.rs/async-io/1.7.0/src/async_io/lib.rs.html#91 let timer = time::interval_at( @@ -104,14 +100,12 @@ pub mod tokio { WrapTimer { timer } } - /// Creates a timer that emits events periodically. fn interval(duration: Duration) -> Self { WrapTimer { timer: time::interval(duration), } } - /// Creates a timer that emits events periodically, starting at start. fn interval_at(start: Instant, duration: Duration) -> Self { WrapTimer { timer: time::interval_at(TokioInstant::from_std(start), duration), From f57a3754da660d75ab8f82138f5fc17595436701 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Sun, 17 Jul 2022 17:33:22 -0400 Subject: [PATCH 18/30] fix(protocols/mdns): add cfg_attr docsr --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 6612984af2b..3527cc3ea0e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,6 +80,7 @@ pub use libp2p_identify as identify; #[doc(inline)] pub use libp2p_kad as kad; #[cfg(any(feature = "mdns-async-io", feature = "mdns-tokio"))] +#[cfg_attr(docsrs, doc(cfg(any(feature = "mdns-tokio", feature = "mdns-async-io"))))] #[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))] #[doc(inline)] pub use libp2p_mdns as mdns; From c1de5bc4e1fd7d65b7b385f7d74a730cb00225db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Sun, 17 Jul 2022 17:43:07 -0400 Subject: [PATCH 19/30] fix(protocols/mdns): update mdns tests --- protocols/mdns/tests/use-async-std.rs | 58 +++++++++++++-------------- protocols/mdns/tests/use-tokio.rs | 56 +++++++++++++------------- 2 files changed, 57 insertions(+), 57 deletions(-) diff --git a/protocols/mdns/tests/use-async-std.rs b/protocols/mdns/tests/use-async-std.rs index 66ba28b5643..683aed338ce 100644 --- a/protocols/mdns/tests/use-async-std.rs +++ b/protocols/mdns/tests/use-async-std.rs @@ -28,6 +28,35 @@ use libp2p::{ use std::error::Error; use std::time::Duration; +#[async_std::test] +async fn test_discovery_async_std_ipv4() -> Result<(), Box> { + run_discovery_test(MdnsConfig::default()).await +} + +#[async_std::test] +async fn test_discovery_async_std_ipv6() -> Result<(), Box> { + let config = MdnsConfig { + enable_ipv6: true, + ..Default::default() + }; + run_discovery_test(config).await +} + +#[async_std::test] +async fn test_expired_async_std() -> Result<(), Box> { + env_logger::try_init().ok(); + let config = MdnsConfig { + ttl: Duration::from_secs(1), + query_interval: Duration::from_secs(10), + ..Default::default() + }; + + async_std::future::timeout(Duration::from_secs(6), run_peer_expiration_test(config)) + .await + .map(|_| ()) + .map_err(|e| Box::new(e) as Box) +} + async fn create_swarm(config: MdnsConfig) -> Result, Box> { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(id_keys.public()); @@ -78,20 +107,6 @@ async fn run_discovery_test(config: MdnsConfig) -> Result<(), Box> { } } -#[async_std::test] -async fn test_discovery_async_std_ipv4() -> Result<(), Box> { - run_discovery_test(MdnsConfig::default()).await -} - -#[async_std::test] -async fn test_discovery_async_std_ipv6() -> Result<(), Box> { - let config = MdnsConfig { - enable_ipv6: true, - ..Default::default() - }; - run_discovery_test(config).await -} - async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box> { let mut a = create_swarm(config.clone()).await?; let mut b = create_swarm(config).await?; @@ -122,18 +137,3 @@ async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box Result<(), Box> { - env_logger::try_init().ok(); - let config = MdnsConfig { - ttl: Duration::from_secs(1), - query_interval: Duration::from_secs(10), - ..Default::default() - }; - - async_std::future::timeout(Duration::from_secs(6), run_peer_expiration_test(config)) - .await - .map(|_| ()) - .map_err(|e| Box::new(e) as Box) -} diff --git a/protocols/mdns/tests/use-tokio.rs b/protocols/mdns/tests/use-tokio.rs index c867a30a47b..3b6c661adcb 100644 --- a/protocols/mdns/tests/use-tokio.rs +++ b/protocols/mdns/tests/use-tokio.rs @@ -27,6 +27,34 @@ use libp2p::{ use std::error::Error; use std::time::Duration; +#[tokio::test] +async fn test_discovery_tokio_ipv4() -> Result<(), Box> { + run_discovery_test(MdnsConfig::default()).await +} + +#[tokio::test] +async fn test_discovery_tokio_ipv6() -> Result<(), Box> { + let config = MdnsConfig { + enable_ipv6: true, + ..Default::default() + }; + run_discovery_test(config).await +} + +#[tokio::test] +async fn test_expired_tokio() -> Result<(), Box> { + env_logger::try_init().ok(); + let config = MdnsConfig { + ttl: Duration::from_secs(1), + query_interval: Duration::from_secs(10), + ..Default::default() + }; + + tokio::time::timeout(Duration::from_secs(6), run_peer_expiration_test(config)) + .await + .unwrap() +} + async fn create_swarm(config: MdnsConfig) -> Result, Box> { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(id_keys.public()); @@ -77,20 +105,6 @@ async fn run_discovery_test(config: MdnsConfig) -> Result<(), Box> { } } -#[tokio::test] -async fn test_discovery_tokio_ipv4() -> Result<(), Box> { - run_discovery_test(MdnsConfig::default()).await -} - -#[tokio::test] -async fn test_discovery_tokio_ipv6() -> Result<(), Box> { - let config = MdnsConfig { - enable_ipv6: true, - ..Default::default() - }; - run_discovery_test(config).await -} - async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box> { let mut a = create_swarm(config.clone()).await?; let mut b = create_swarm(config).await?; @@ -121,17 +135,3 @@ async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box Result<(), Box> { - env_logger::try_init().ok(); - let config = MdnsConfig { - ttl: Duration::from_secs(1), - query_interval: Duration::from_secs(10), - ..Default::default() - }; - - tokio::time::timeout(Duration::from_secs(6), run_peer_expiration_test(config)) - .await - .unwrap() -} From 5b21637b69a1059b21ace966c96a6a88ef16f349 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Sun, 17 Jul 2022 17:45:32 -0400 Subject: [PATCH 20/30] chore(protocols/mdns): add fmt --- src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 3527cc3ea0e..3ed00408cb5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,7 +80,10 @@ pub use libp2p_identify as identify; #[doc(inline)] pub use libp2p_kad as kad; #[cfg(any(feature = "mdns-async-io", feature = "mdns-tokio"))] -#[cfg_attr(docsrs, doc(cfg(any(feature = "mdns-tokio", feature = "mdns-async-io"))))] +#[cfg_attr( + docsrs, + doc(cfg(any(feature = "mdns-tokio", feature = "mdns-async-io"))) +)] #[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))] #[doc(inline)] pub use libp2p_mdns as mdns; From c142bdee23f43214879242e20935d7dd7367b23a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Sun, 17 Jul 2022 23:25:56 -0400 Subject: [PATCH 21/30] fix(protocols/mdns): update socket and timer impl --- protocols/mdns/src/behaviour/iface.rs | 37 +++++++++++++----------- protocols/mdns/src/behaviour/socket.rs | 40 +++++++++----------------- protocols/mdns/src/behaviour/timer.rs | 40 +++++++++++++------------- 3 files changed, 54 insertions(+), 63 deletions(-) diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index 607d5de1001..98752fe36d6 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -200,15 +200,20 @@ where ) -> Option<(PeerId, Multiaddr, Instant)> { // Poll receive socket. match self.recv_socket.poll_read(cx, &mut self.recv_buffer) { - Poll::Ready(Ok(Some((len, from)))) => { - if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) { - self.inject_mdns_packet(packet, params); + Poll::Ready(result) => match result { + Ok((len, from)) => { + if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) + { + self.inject_mdns_packet(packet, params); + } } - } - Poll::Ready(Ok(None)) => {} - Poll::Ready(Err(err)) => { - log::error!("failed reading datagram: {}", err); - } + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + // Not more bytes available on the socket to read + } + Err(err) => { + log::error!("failed reading datagram: {}", err); + } + }, Poll::Pending => {} } @@ -219,14 +224,14 @@ where &packet, SocketAddr::new(self.multicast_addr, 5353), ) { - Poll::Ready(Ok(_)) => { - log::trace!("sent packet on iface {}", self.addr) - } - Poll::Ready(Err(err)) => { - log::error!("error sending packet on iface {} {}", self.addr, err); - self.send_buffer.push_front(packet); - break; - } + Poll::Ready(data) => match data { + Ok(_) => log::trace!("sent packet on iface {}", self.addr), + Err(err) => { + log::error!("error sending packet on iface {} {}", self.addr, err); + self.send_buffer.push_front(packet); + break; + } + }, Poll::Pending => { self.send_buffer.push_front(packet); break; diff --git a/protocols/mdns/src/behaviour/socket.rs b/protocols/mdns/src/behaviour/socket.rs index c4c5d0fc856..7e62bec5f7c 100644 --- a/protocols/mdns/src/behaviour/socket.rs +++ b/protocols/mdns/src/behaviour/socket.rs @@ -36,7 +36,7 @@ pub trait AsyncSocket: Send + 'static { &mut self, _cx: &mut Context, _buf: &mut [u8], - ) -> Poll, Error>>; + ) -> Poll>; /// Attempts to send data on the socket to a given address. fn poll_write( @@ -65,13 +65,12 @@ pub mod asio { &mut self, cx: &mut Context, buf: &mut [u8], - ) -> Poll, Error>> { + ) -> Poll> { // Poll receive socket. - let _ = futures::ready!(self.poll_readable(cx)); + let _ = futures::ready!(self.poll_readable(cx))?; match self.recv_from(buf).now_or_never() { - Some(Ok((len, from))) => Poll::Ready(Ok(Some((len, from)))), - Some(Err(err)) => Poll::Ready(Err(err)), - None => Poll::Ready(Ok(None)), + Some(data) => Poll::Ready(data), + None => Poll::Pending, } } @@ -81,7 +80,7 @@ pub mod asio { packet: &[u8], to: SocketAddr, ) -> Poll> { - let _ = futures::ready!(self.poll_writable(cx)); + let _ = futures::ready!(self.poll_writable(cx))?; match self.send_to(packet, to).now_or_never() { Some(Ok(_)) => Poll::Ready(Ok(())), Some(Err(err)) => Poll::Ready(Err(err)), @@ -109,18 +108,9 @@ pub mod tokio { &mut self, cx: &mut Context, buf: &mut [u8], - ) -> Poll, Error>> { - match self.poll_recv_ready(cx) { - Poll::Ready(Ok(_)) => match self.try_recv_from(buf) { - Ok((len, from)) => Poll::Ready(Ok(Some((len, from)))), - Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { - Poll::Ready(Ok(None)) - } - Err(err) => Poll::Ready(Err(err)), - }, - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - _ => Poll::Pending, - } + ) -> Poll> { + let _ = futures::ready!(self.poll_recv_ready(cx))?; + Poll::Ready(self.try_recv_from(buf)) } fn poll_write( @@ -129,14 +119,10 @@ pub mod tokio { packet: &[u8], to: SocketAddr, ) -> Poll> { - match self.poll_send_ready(cx) { - Poll::Ready(Ok(_)) => match self.try_send_to(packet, to) { - Ok(_len) => Poll::Ready(Ok(())), - Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Poll::Ready(Ok(())), - Err(err) => Poll::Ready(Err(err)), - }, - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - _ => Poll::Pending, + let _ = futures::ready!(self.poll_send_ready(cx))?; + match self.try_send_to(packet, to) { + Ok(_len) => Poll::Ready(Ok(())), + Err(err) => Poll::Ready(Err(err)), } } } diff --git a/protocols/mdns/src/behaviour/timer.rs b/protocols/mdns/src/behaviour/timer.rs index 4356b8f860b..fe9286e25cf 100644 --- a/protocols/mdns/src/behaviour/timer.rs +++ b/protocols/mdns/src/behaviour/timer.rs @@ -27,8 +27,8 @@ use std::{ /// Simple wrapper for the differents type of timers #[derive(Debug)] -pub struct WrapTimer { - timer: T, +pub struct Timer { + inner: T, } /// Builder interface to homogenize the differents implementations @@ -46,28 +46,28 @@ pub trait Builder: Send + Unpin + 'static { #[cfg(feature = "async-io")] pub mod asio { use super::*; - use async_io::Timer; + use async_io::Timer as AsioTimer; use futures::Stream; /// Async Timer - pub type AsyncTimer = WrapTimer; + pub type AsyncTimer = Timer; - impl Builder for WrapTimer { + impl Builder for AsyncTimer { fn at(instant: Instant) -> Self { - WrapTimer { - timer: Timer::at(instant), + Self { + inner: AsioTimer::at(instant), } } fn interval(duration: Duration) -> Self { - WrapTimer { - timer: Timer::interval(duration), + Self { + inner: AsioTimer::interval(duration), } } fn interval_at(start: Instant, duration: Duration) -> Self { - WrapTimer { - timer: Timer::interval_at(start, duration), + Self { + inner: AsioTimer::interval_at(start, duration), } } } @@ -76,7 +76,7 @@ pub mod asio { type Item = Instant; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.timer).poll_next(cx) + Pin::new(&mut self.inner).poll_next(cx) } } } @@ -88,27 +88,27 @@ pub mod tokio { use futures::Stream; /// Tokio wrapper - pub type TokioTimer = WrapTimer; + pub type TokioTimer = Timer; - impl Builder for WrapTimer { + impl Builder for TokioTimer { fn at(instant: Instant) -> Self { // Taken from: https://docs.rs/async-io/1.7.0/src/async_io/lib.rs.html#91 let timer = time::interval_at( TokioInstant::from_std(instant), Duration::new(std::u64::MAX, 1_000_000_000 - 1), ); - WrapTimer { timer } + Self { inner: timer } } fn interval(duration: Duration) -> Self { - WrapTimer { - timer: time::interval(duration), + Timer { + inner: time::interval(duration), } } fn interval_at(start: Instant, duration: Duration) -> Self { - WrapTimer { - timer: time::interval_at(TokioInstant::from_std(start), duration), + Timer { + inner: time::interval_at(TokioInstant::from_std(start), duration), } } } @@ -117,7 +117,7 @@ pub mod tokio { type Item = TokioInstant; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.timer.poll_tick(cx).map(Some) + self.inner.poll_tick(cx).map(Some) } } } From f80f93ed8ba9d482a3edab6d60700a10efbe0713 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Mon, 18 Jul 2022 14:58:07 -0400 Subject: [PATCH 22/30] fix(protocols/mdns): fix implementation logic and style --- protocols/mdns/src/behaviour/iface.rs | 51 ++++++++++++++------------ protocols/mdns/src/behaviour/socket.rs | 8 ++-- 2 files changed, 31 insertions(+), 28 deletions(-) diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index 98752fe36d6..901aa76abd4 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -110,8 +110,7 @@ where SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0) } }; - let std_socket = UdpSocket::bind(bind_addr)?; - let send_socket = U::from_std(std_socket)?; + let send_socket = U::from_std(UdpSocket::bind(bind_addr)?)?; // randomize timer to prevent all converging and firing at the same time. let query_interval = { @@ -176,13 +175,16 @@ where let new_expiration = Instant::now() + peer.ttl(); for addr in peer.addresses() { - let da = if let Some(new_addr) = address_translation(addr, &observed) { - new_addr.clone() - } else { - addr.clone() - }; - - self.discovered.push_back((*peer.id(), da, new_expiration)); + if let Some(new_addr) = address_translation(addr, &observed) { + self.discovered.push_back(( + *peer.id(), + new_addr.clone(), + new_expiration, + )); + } + + self.discovered + .push_back((*peer.id(), addr.clone(), new_expiration)); } } } @@ -199,22 +201,26 @@ where params: &impl PollParameters, ) -> Option<(PeerId, Multiaddr, Instant)> { // Poll receive socket. - match self.recv_socket.poll_read(cx, &mut self.recv_buffer) { - Poll::Ready(result) => match result { - Ok((len, from)) => { + loop { + match self.recv_socket.poll_read(cx, &mut self.recv_buffer) { + Poll::Ready(Ok((len, from))) => { if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) { self.inject_mdns_packet(packet, params); } } - Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + Poll::Ready(Err(err)) if err.kind() == std::io::ErrorKind::WouldBlock => { // Not more bytes available on the socket to read + break; } - Err(err) => { + Poll::Ready(Err(err)) => { log::error!("failed reading datagram: {}", err); + break; + } + Poll::Pending => { + break; } - }, - Poll::Pending => {} + } } // Send responses. @@ -224,14 +230,11 @@ where &packet, SocketAddr::new(self.multicast_addr, 5353), ) { - Poll::Ready(data) => match data { - Ok(_) => log::trace!("sent packet on iface {}", self.addr), - Err(err) => { - log::error!("error sending packet on iface {} {}", self.addr, err); - self.send_buffer.push_front(packet); - break; - } - }, + Poll::Ready(Ok(_)) => log::trace!("sent packet on iface {}", self.addr), + Poll::Ready(Err(err)) => { + log::error!("error sending packet on iface {} {}", self.addr, err); + break; + } Poll::Pending => { self.send_buffer.push_front(packet); break; diff --git a/protocols/mdns/src/behaviour/socket.rs b/protocols/mdns/src/behaviour/socket.rs index 7e62bec5f7c..e44dfea2638 100644 --- a/protocols/mdns/src/behaviour/socket.rs +++ b/protocols/mdns/src/behaviour/socket.rs @@ -67,7 +67,7 @@ pub mod asio { buf: &mut [u8], ) -> Poll> { // Poll receive socket. - let _ = futures::ready!(self.poll_readable(cx))?; + futures::ready!(self.poll_readable(cx))?; match self.recv_from(buf).now_or_never() { Some(data) => Poll::Ready(data), None => Poll::Pending, @@ -80,7 +80,7 @@ pub mod asio { packet: &[u8], to: SocketAddr, ) -> Poll> { - let _ = futures::ready!(self.poll_writable(cx))?; + futures::ready!(self.poll_writable(cx))?; match self.send_to(packet, to).now_or_never() { Some(Ok(_)) => Poll::Ready(Ok(())), Some(Err(err)) => Poll::Ready(Err(err)), @@ -109,7 +109,7 @@ pub mod tokio { cx: &mut Context, buf: &mut [u8], ) -> Poll> { - let _ = futures::ready!(self.poll_recv_ready(cx))?; + futures::ready!(self.poll_recv_ready(cx))?; Poll::Ready(self.try_recv_from(buf)) } @@ -119,7 +119,7 @@ pub mod tokio { packet: &[u8], to: SocketAddr, ) -> Poll> { - let _ = futures::ready!(self.poll_send_ready(cx))?; + futures::ready!(self.poll_send_ready(cx))?; match self.try_send_to(packet, to) { Ok(_len) => Poll::Ready(Ok(())), Err(err) => Poll::Ready(Err(err)), From e19f2c38e78f043b6810e8d20fd3aafee620e202 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Mon, 18 Jul 2022 23:06:09 -0400 Subject: [PATCH 23/30] doc(protocols/mdns): fix documentation --- examples/chat-tokio.rs | 2 +- protocols/mdns/CHANGELOG.md | 4 ++++ protocols/mdns/src/behaviour/iface.rs | 2 +- protocols/mdns/src/behaviour/socket.rs | 4 ++-- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index 35c8eddb79e..f82d30934c9 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -25,7 +25,7 @@ //! The example is run per node as follows: //! //! ```sh -//! cargo run --example chat-tokio --features="tcp-tokio mdns" +//! cargo run --example chat-tokio --features="tcp-tokio mdns-tokio" //! ``` //! //! Alternatively, to run with the minimal set of features and crates: diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index e33a81509e5..61fa9770329 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -7,6 +7,10 @@ - Update to `libp2p-swarm` `v0.38.0`. - Update to `if-watch` `v1.1.1`. +- Allow users to choose between async-io and tokio runtime + in the mdns protocol implementation. `async-io` is a default + feature, with an additional `tokyo` feature (see [PR 2748]) + - Update to `libp2p-core` `v0.35.0`. - Fix high CPU usage with Tokio library (see [PR 2748]). diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index 901aa76abd4..c23175c97b7 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -210,7 +210,7 @@ where } } Poll::Ready(Err(err)) if err.kind() == std::io::ErrorKind::WouldBlock => { - // Not more bytes available on the socket to read + // No more bytes available on the socket to read break; } Poll::Ready(Err(err)) => { diff --git a/protocols/mdns/src/behaviour/socket.rs b/protocols/mdns/src/behaviour/socket.rs index e44dfea2638..86d5ec97c12 100644 --- a/protocols/mdns/src/behaviour/socket.rs +++ b/protocols/mdns/src/behaviour/socket.rs @@ -24,9 +24,9 @@ use std::{ task::{Context, Poll}, }; -/// Interface that must be implemented by the different runtimes to use the UdpSocket in async mode +/// Interface that must be implemented by the different runtimes to use the [`UdpSocket`] in async mode pub trait AsyncSocket: Send + 'static { - /// Create the async socket from the ```std::net::UdpSocket``` + /// Create the async socket from the [`std::net::UdpSocket`] fn from_std(socket: UdpSocket) -> std::io::Result where Self: Sized; From 01643bb23445b64a78915413657397f6cce36847 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Wed, 20 Jul 2022 08:21:21 -0400 Subject: [PATCH 24/30] fix(protocols/mdns): fix typo and logic --- protocols/mdns/CHANGELOG.md | 2 +- protocols/mdns/src/behaviour/iface.rs | 15 +++++---------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 61fa9770329..17df24fa1f6 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -9,7 +9,7 @@ - Allow users to choose between async-io and tokio runtime in the mdns protocol implementation. `async-io` is a default - feature, with an additional `tokyo` feature (see [PR 2748]) + feature, with an additional `tokio` feature (see [PR 2748]) - Update to `libp2p-core` `v0.35.0`. diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index c23175c97b7..40ebdabac59 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -201,24 +201,20 @@ where params: &impl PollParameters, ) -> Option<(PeerId, Multiaddr, Instant)> { // Poll receive socket. - loop { - match self.recv_socket.poll_read(cx, &mut self.recv_buffer) { - Poll::Ready(Ok((len, from))) => { + while let Poll::Ready(data) = self.recv_socket.poll_read(cx, &mut self.recv_buffer) { + match data { + Ok((len, from)) => { if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) { self.inject_mdns_packet(packet, params); } } - Poll::Ready(Err(err)) if err.kind() == std::io::ErrorKind::WouldBlock => { + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { // No more bytes available on the socket to read break; } - Poll::Ready(Err(err)) => { + Err(err) => { log::error!("failed reading datagram: {}", err); - break; - } - Poll::Pending => { - break; } } } @@ -233,7 +229,6 @@ where Poll::Ready(Ok(_)) => log::trace!("sent packet on iface {}", self.addr), Poll::Ready(Err(err)) => { log::error!("error sending packet on iface {} {}", self.addr, err); - break; } Poll::Pending => { self.send_buffer.push_front(packet); From 92e219adcf752b7a5fdfa9bae9288b4797b7d91b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Wed, 27 Jul 2022 16:44:14 -0400 Subject: [PATCH 25/30] fix(protocols/mdns): sockets pinned on poll --- protocols/mdns/src/behaviour/iface.rs | 6 ++++-- protocols/mdns/src/behaviour/socket.rs | 21 +++++++++++++-------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index 40ebdabac59..c5bacced138 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -201,7 +201,9 @@ where params: &impl PollParameters, ) -> Option<(PeerId, Multiaddr, Instant)> { // Poll receive socket. - while let Poll::Ready(data) = self.recv_socket.poll_read(cx, &mut self.recv_buffer) { + while let Poll::Ready(data) = + Pin::new(&mut self.recv_socket).poll_read(cx, &mut self.recv_buffer) + { match data { Ok((len, from)) => { if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) @@ -221,7 +223,7 @@ where // Send responses. while let Some(packet) = self.send_buffer.pop_front() { - match self.send_socket.poll_write( + match Pin::new(&mut self.send_socket).poll_write( cx, &packet, SocketAddr::new(self.multicast_addr, 5353), diff --git a/protocols/mdns/src/behaviour/socket.rs b/protocols/mdns/src/behaviour/socket.rs index 86d5ec97c12..4406ed33fde 100644 --- a/protocols/mdns/src/behaviour/socket.rs +++ b/protocols/mdns/src/behaviour/socket.rs @@ -20,12 +20,13 @@ use std::{ io::Error, + marker::Unpin, net::{SocketAddr, UdpSocket}, task::{Context, Poll}, }; /// Interface that must be implemented by the different runtimes to use the [`UdpSocket`] in async mode -pub trait AsyncSocket: Send + 'static { +pub trait AsyncSocket: Unpin + Send + 'static { /// Create the async socket from the [`std::net::UdpSocket`] fn from_std(socket: UdpSocket) -> std::io::Result where @@ -93,7 +94,7 @@ pub mod asio { #[cfg(feature = "tokio")] pub mod tokio { use super::*; - use ::tokio::net::UdpSocket as TkUdpSocket; + use ::tokio::{io::ReadBuf, net::UdpSocket as TkUdpSocket}; /// Tokio ASync Socket` pub type TokioUdpSocket = TkUdpSocket; @@ -109,8 +110,12 @@ pub mod tokio { cx: &mut Context, buf: &mut [u8], ) -> Poll> { - futures::ready!(self.poll_recv_ready(cx))?; - Poll::Ready(self.try_recv_from(buf)) + let mut rbuf = ReadBuf::new(buf); + match self.poll_recv_from(cx, &mut rbuf) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Ready(Ok(addr)) => Poll::Ready(Ok((rbuf.filled().len(), addr))), + } } fn poll_write( @@ -119,10 +124,10 @@ pub mod tokio { packet: &[u8], to: SocketAddr, ) -> Poll> { - futures::ready!(self.poll_send_ready(cx))?; - match self.try_send_to(packet, to) { - Ok(_len) => Poll::Ready(Ok(())), - Err(err) => Poll::Ready(Err(err)), + match self.poll_send_to(cx, packet, to) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Ready(Ok(_len)) => Poll::Ready(Ok(())), } } } From d6045caa57f6dca9adc4e27403c7aa4dd1f4fc92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Thu, 28 Jul 2022 11:18:12 -0400 Subject: [PATCH 26/30] fix(protocols/mdns): change tokio interval missed tick behavior --- protocols/mdns/src/behaviour/timer.rs | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/protocols/mdns/src/behaviour/timer.rs b/protocols/mdns/src/behaviour/timer.rs index fe9286e25cf..ff0d91d4448 100644 --- a/protocols/mdns/src/behaviour/timer.rs +++ b/protocols/mdns/src/behaviour/timer.rs @@ -84,7 +84,7 @@ pub mod asio { #[cfg(feature = "tokio")] pub mod tokio { use super::*; - use ::tokio::time::{self, Instant as TokioInstant, Interval}; + use ::tokio::time::{self, Instant as TokioInstant, Interval, MissedTickBehavior}; use futures::Stream; /// Tokio wrapper @@ -93,23 +93,24 @@ pub mod tokio { impl Builder for TokioTimer { fn at(instant: Instant) -> Self { // Taken from: https://docs.rs/async-io/1.7.0/src/async_io/lib.rs.html#91 - let timer = time::interval_at( + let mut inner = time::interval_at( TokioInstant::from_std(instant), Duration::new(std::u64::MAX, 1_000_000_000 - 1), ); - Self { inner: timer } + inner.set_missed_tick_behavior(MissedTickBehavior::Skip); + Self { inner } } fn interval(duration: Duration) -> Self { - Timer { - inner: time::interval(duration), - } + let mut inner = time::interval(duration); + inner.set_missed_tick_behavior(MissedTickBehavior::Skip); + Self { inner } } fn interval_at(start: Instant, duration: Duration) -> Self { - Timer { - inner: time::interval_at(TokioInstant::from_std(start), duration), - } + let mut inner = time::interval_at(TokioInstant::from_std(start), duration); + inner.set_missed_tick_behavior(MissedTickBehavior::Skip); + Self { inner } } } @@ -119,5 +120,9 @@ pub mod tokio { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner.poll_tick(cx).map(Some) } + + fn size_hint(&self) -> (usize, Option) { + (std::usize::MAX, None) + } } } From 393c19f9723e85fe4153bccc2c62656a56c97996 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Mon, 22 Aug 2022 16:39:06 -0400 Subject: [PATCH 27/30] refactor(mdns/test): update tokio expiration test --- protocols/mdns/tests/use-tokio.rs | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/protocols/mdns/tests/use-tokio.rs b/protocols/mdns/tests/use-tokio.rs index 3b6c661adcb..9d6cacd76cb 100644 --- a/protocols/mdns/tests/use-tokio.rs +++ b/protocols/mdns/tests/use-tokio.rs @@ -50,9 +50,7 @@ async fn test_expired_tokio() -> Result<(), Box> { ..Default::default() }; - tokio::time::timeout(Duration::from_secs(6), run_peer_expiration_test(config)) - .await - .unwrap() + run_peer_expiration_test(config).await } async fn create_swarm(config: MdnsConfig) -> Result, Box> { @@ -108,9 +106,14 @@ async fn run_discovery_test(config: MdnsConfig) -> Result<(), Box> { async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box> { let mut a = create_swarm(config.clone()).await?; let mut b = create_swarm(config).await?; + let expired_at = tokio::time::sleep(Duration::from_secs(15)); + tokio::pin!(expired_at); loop { - futures::select! { + tokio::select! { + _ev = &mut expired_at => { + panic!(); + }, ev = a.select_next_some() => match ev { SwarmEvent::Behaviour(MdnsEvent::Expired(peers)) => { for (peer, _addr) in peers { @@ -119,6 +122,13 @@ async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box { + for (peer, _addr) in peers { + if peer == *b.local_peer_id() { + expired_at.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(2)); + } + } + } _ => {} }, ev = b.select_next_some() => match ev { @@ -129,9 +139,15 @@ async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box { + for (peer, _addr) in peers { + if peer == *a.local_peer_id() { + expired_at.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(2)); + } + } + } _ => {} } - } } } From d7e798c077a20a42554958de87e8e57c9e264d0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Wed, 24 Aug 2022 18:01:11 -0400 Subject: [PATCH 28/30] fix(protocol/mdns): change interval behavior for tokio timer impl --- protocols/mdns/src/behaviour/timer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/mdns/src/behaviour/timer.rs b/protocols/mdns/src/behaviour/timer.rs index ff0d91d4448..fbdeb065b70 100644 --- a/protocols/mdns/src/behaviour/timer.rs +++ b/protocols/mdns/src/behaviour/timer.rs @@ -102,7 +102,7 @@ pub mod tokio { } fn interval(duration: Duration) -> Self { - let mut inner = time::interval(duration); + let mut inner = time::interval_at(TokioInstant::now() + duration, duration); inner.set_missed_tick_behavior(MissedTickBehavior::Skip); Self { inner } } From 25d9b19dd42833bdc5cb269ebf8cb63956893eea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yolier=20Gal=C3=A1n=20Tass=C3=A9?= Date: Thu, 1 Sep 2022 16:16:52 -0400 Subject: [PATCH 29/30] chore(workflow): remove testing workflow --- .github/workflows/testing.yml | 134 ---------------------------------- 1 file changed, 134 deletions(-) delete mode 100644 .github/workflows/testing.yml diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml deleted file mode 100644 index eb77060a8ee..00000000000 --- a/.github/workflows/testing.yml +++ /dev/null @@ -1,134 +0,0 @@ -name: Pre testing - -on: - workflow_dispatch - -jobs: - testing-all-features: - name: Build and test - runs-on: ubuntu-latest - strategy: - matrix: - args: [ - "--no-default-features", - "--all-features", - "--benches --all-features", - ] - steps: - - - name: Cancel Previous Runs - uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0 - with: - access_token: ${{ github.token }} - - - name: Install Protoc - uses: arduino/setup-protoc@v1 - - - uses: actions/checkout@v3 - - - uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0 - with: - key: ${{ matrix.args }} - - - run: cargo test --workspace ${{ matrix.args }} - - check-rustdoc-links: - name: Check rustdoc intra-doc links - runs-on: ubuntu-latest - steps: - - - name: Cancel Previous Runs - uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0 - with: - access_token: ${{ github.token }} - - - name: Install Protoc - uses: arduino/setup-protoc@v1 - - - uses: actions/checkout@v3 - - - uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7 - with: - profile: minimal - toolchain: stable - override: true - - - uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0 - - - name: Check rustdoc links - run: RUSTDOCFLAGS="--deny broken_intra_doc_links" cargo doc --verbose --workspace --no-deps --document-private-items --all-features - - check-clippy: - runs-on: ubuntu-latest - steps: - - - name: Cancel Previous Runs - uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0 - with: - access_token: ${{ github.token }} - - - name: Install Protoc - uses: arduino/setup-protoc@v1 - - - uses: actions/checkout@v3 - - - uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7 - with: - profile: minimal - toolchain: stable - override: true - components: clippy - - - uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0 - - - name: Run cargo clippy - uses: actions-rs/cargo@844f36862e911db73fe0815f00a4a2602c279505 # v1.0.3 - with: - command: custom-clippy # cargo alias to allow reuse of config locally - - integration-test: - name: Integration tests - runs-on: ubuntu-latest - steps: - - - name: Cancel Previous Runs - uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0 - with: - access_token: ${{ github.token }} - - - name: Install Protoc - uses: arduino/setup-protoc@v1 - - - uses: actions/checkout@v3 - - - uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7 - with: - profile: minimal - toolchain: stable - override: true - - - uses: Swatinem/rust-cache@6720f05bc48b77f96918929a9019fb2203ff71f8 # v2.0.0 - - - name: Run ipfs-kad example - run: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad - - rustfmt: - runs-on: ubuntu-latest - steps: - - - name: Cancel Previous Runs - uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0 - with: - access_token: ${{ github.token }} - - - uses: actions/checkout@v3 - - - uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7 - with: - profile: minimal - toolchain: stable - override: true - components: rustfmt - - - name: Check formatting - run: cargo fmt -- --check From 4e44dc9bcbe5ae3c17479dfe488a99d384db2335 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 2 Sep 2022 12:26:18 +0900 Subject: [PATCH 30/30] protocols/mdns: Update changelog Given the latest release, this patch now targets `libp2p-mdns` `v0.40.0` and not `v0.39.0`. --- protocols/mdns/CHANGELOG.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 17df24fa1f6..4a22bdb1c3f 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -2,21 +2,21 @@ - Update to `libp2p-swarm` `v0.39.0`. -# 0.39.0 - -- Update to `libp2p-swarm` `v0.38.0`. -- Update to `if-watch` `v1.1.1`. - - Allow users to choose between async-io and tokio runtime in the mdns protocol implementation. `async-io` is a default feature, with an additional `tokio` feature (see [PR 2748]) -- Update to `libp2p-core` `v0.35.0`. - - Fix high CPU usage with Tokio library (see [PR 2748]). [PR 2748]: https://github.com/libp2p/rust-libp2p/pull/2748 +# 0.39.0 + +- Update to `libp2p-swarm` `v0.38.0`. +- Update to `if-watch` `v1.1.1`. + +- Update to `libp2p-core` `v0.35.0`. + # 0.38.0 - Update to `libp2p-core` `v0.34.0`.