Skip to content

Commit

Permalink
protocols/mdns: Allow users to choose between async-io and tokio runt…
Browse files Browse the repository at this point in the history
…ime (#2748)

Allow users to choose between async-io and tokio runtime
in the mdns protocol implementation. `async-io` is a default
feature, with an additional `tokio` feature.

Fix high CPU usage with Tokio library.
  • Loading branch information
gallegogt committed Sep 2, 2022
1 parent 36a2773 commit 89f898c
Show file tree
Hide file tree
Showing 12 changed files with 590 additions and 130 deletions.
9 changes: 5 additions & 4 deletions Cargo.toml
Expand Up @@ -19,7 +19,7 @@ default = [
"identify",
"kad",
"gossipsub",
"mdns",
"mdns-async-io",
"mplex",
"noise",
"ping",
Expand All @@ -46,7 +46,8 @@ identify = ["dep:libp2p-identify", "libp2p-metrics?/identify"]
kad = ["dep:libp2p-kad", "libp2p-metrics?/kad"]
gossipsub = ["dep:libp2p-gossipsub", "libp2p-metrics?/gossipsub"]
metrics = ["dep:libp2p-metrics"]
mdns = ["dep:libp2p-mdns"]
mdns-async-io = ["dep:libp2p-mdns", "libp2p-mdns?/async-io"]
mdns-tokio = ["dep:libp2p-mdns", "libp2p-mdns?/tokio"]
mplex = ["dep:libp2p-mplex"]
noise = ["dep:libp2p-noise"]
ping = ["dep:libp2p-ping", "libp2p-metrics?/ping"]
Expand Down Expand Up @@ -106,7 +107,7 @@ smallvec = "1.6.1"
[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.35.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.35.0", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.40.0", path = "protocols/mdns", optional = true }
libp2p-mdns = { version = "0.40.0", path = "protocols/mdns", optional = true, default-features = false }
libp2p-tcp = { version = "0.35.0", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.37.0", path = "transports/websocket", optional = true }

Expand Down Expand Up @@ -160,7 +161,7 @@ required-features = ["floodsub"]

[[example]]
name = "chat-tokio"
required-features = ["tcp-tokio", "mdns"]
required-features = ["tcp-tokio", "mdns-tokio"]

[[example]]
name = "file-sharing"
Expand Down
14 changes: 9 additions & 5 deletions examples/chat-tokio.rs
Expand Up @@ -25,23 +25,27 @@
//! The example is run per node as follows:
//!
//! ```sh
//! cargo run --example chat-tokio --features="tcp-tokio mdns"
//! cargo run --example chat-tokio --features="tcp-tokio mdns-tokio"
//! ```
//!
//! Alternatively, to run with the minimal set of features and crates:
//!
//! ```sh
//!cargo run --example chat-tokio \\
//! --no-default-features \\
//! --features="floodsub mplex noise tcp-tokio mdns"
//! --features="floodsub mplex noise tcp-tokio mdns-tokio"
//! ```

use futures::StreamExt;
use libp2p::{
core::upgrade,
floodsub::{self, Floodsub, FloodsubEvent},
identity,
mdns::{Mdns, MdnsEvent},
mdns::{
MdnsEvent,
// `TokioMdns` is available through the `mdns-tokio` feature.
TokioMdns,
},
mplex,
noise,
swarm::{SwarmBuilder, SwarmEvent},
Expand Down Expand Up @@ -88,7 +92,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
#[behaviour(out_event = "MyBehaviourEvent")]
struct MyBehaviour {
floodsub: Floodsub,
mdns: Mdns,
mdns: TokioMdns,
}

#[allow(clippy::large_enum_variant)]
Expand All @@ -111,7 +115,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

// Create a Swarm to manage peers and events.
let mut swarm = {
let mdns = Mdns::new(Default::default()).await?;
let mdns = TokioMdns::new(Default::default()).await?;
let mut behaviour = MyBehaviour {
floodsub: Floodsub::new(peer_id),
mdns,
Expand Down
8 changes: 8 additions & 0 deletions protocols/mdns/CHANGELOG.md
Expand Up @@ -2,6 +2,14 @@

- Update to `libp2p-swarm` `v0.39.0`.

- Allow users to choose between async-io and tokio runtime
in the mdns protocol implementation. `async-io` is a default
feature, with an additional `tokio` feature (see [PR 2748])

- Fix high CPU usage with Tokio library (see [PR 2748]).

[PR 2748]: https://github.com/libp2p/rust-libp2p/pull/2748

# 0.39.0

- Update to `libp2p-swarm` `v0.38.0`.
Expand Down
23 changes: 20 additions & 3 deletions protocols/mdns/Cargo.toml
Expand Up @@ -11,7 +11,6 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
async-io = "1.3.1"
data-encoding = "2.3.2"
dns-parser = "0.8.0"
futures = "0.3.13"
Expand All @@ -25,8 +24,26 @@ smallvec = "1.6.1"
socket2 = { version = "0.4.0", features = ["all"] }
void = "1.0.2"

async-io = { version = "1.3.1", optional = true }
tokio = { version = "1.19", default-features = false, features = ["net", "time"], optional = true}

[features]
default = ["async-io"]
tokio = ["dep:tokio"]
async-io = ["dep:async-io"]

[dev-dependencies]
async-std = { version = "1.9.0", features = ["attributes"] }
env_logger = "0.9.0"
libp2p = { path = "../..", default-features = false, features = ["mdns", "tcp-async-io", "dns-async-std", "websocket", "noise", "mplex", "yamux"] }
tokio = { version = "1.15", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] }
libp2p = { path = "../..", default-features = false, features = ["mdns-async-io", "tcp-async-io", "dns-async-std", "tcp-tokio", "dns-tokio", "websocket", "noise", "mplex", "yamux"] }
tokio = { version = "1.19", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] }


[[test]]
name = "use-async-std"
required-features = ["async-io"]

[[test]]
name = "use-tokio"
required-features = ["tokio"]

43 changes: 34 additions & 9 deletions protocols/mdns/src/behaviour.rs
Expand Up @@ -19,11 +19,14 @@
// DEALINGS IN THE SOFTWARE.

mod iface;
mod socket;
mod timer;

use self::iface::InterfaceState;
use crate::behaviour::{socket::AsyncSocket, timer::Builder};
use crate::MdnsConfig;
use async_io::Timer;
use futures::prelude::*;
use futures::Stream;
use if_watch::{IfEvent, IfWatcher};
use libp2p_core::transport::ListenerId;
use libp2p_core::{Multiaddr, PeerId};
Expand All @@ -35,18 +38,32 @@ use smallvec::SmallVec;
use std::collections::hash_map::{Entry, HashMap};
use std::{cmp, fmt, io, net::IpAddr, pin::Pin, task::Context, task::Poll, time::Instant};

#[cfg(feature = "async-io")]
use crate::behaviour::{socket::asio::AsyncUdpSocket, timer::asio::AsyncTimer};

/// The type of a [`GenMdns`] using the `async-io` implementation.
#[cfg(feature = "async-io")]
pub type Mdns = GenMdns<AsyncUdpSocket, AsyncTimer>;

#[cfg(feature = "tokio")]
use crate::behaviour::{socket::tokio::TokioUdpSocket, timer::tokio::TokioTimer};

/// The type of a [`GenMdns`] using the `tokio` implementation.
#[cfg(feature = "tokio")]
pub type TokioMdns = GenMdns<TokioUdpSocket, TokioTimer>;

/// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds
/// them to the topology.
#[derive(Debug)]
pub struct Mdns {
pub struct GenMdns<S, T> {
/// InterfaceState config.
config: MdnsConfig,

/// Iface watcher.
if_watch: IfWatcher,

/// Mdns interface states.
iface_states: HashMap<IpAddr, InterfaceState>,
iface_states: HashMap<IpAddr, InterfaceState<S, T>>,

/// List of nodes that we have discovered, the address, and when their TTL expires.
///
Expand All @@ -57,10 +74,13 @@ pub struct Mdns {
/// Future that fires when the TTL of at least one node in `discovered_nodes` expires.
///
/// `None` if `discovered_nodes` is empty.
closest_expiration: Option<Timer>,
closest_expiration: Option<T>,
}

impl Mdns {
impl<S, T> GenMdns<S, T>
where
T: Builder,
{
/// Builds a new `Mdns` behaviour.
pub async fn new(config: MdnsConfig) -> io::Result<Self> {
let if_watch = if_watch::IfWatcher::new().await?;
Expand Down Expand Up @@ -91,11 +111,15 @@ impl Mdns {
*expires = now;
}
}
self.closest_expiration = Some(Timer::at(now));
self.closest_expiration = Some(T::at(now));
}
}

impl NetworkBehaviour for Mdns {
impl<S, T> NetworkBehaviour for GenMdns<S, T>
where
T: Builder + Stream,
S: AsyncSocket,
{
type ConnectionHandler = DummyConnectionHandler;
type OutEvent = MdnsEvent;

Expand Down Expand Up @@ -219,8 +243,9 @@ impl NetworkBehaviour for Mdns {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
if let Some(closest_expiration) = closest_expiration {
let mut timer = Timer::at(closest_expiration);
let _ = Pin::new(&mut timer).poll(cx);
let mut timer = T::at(closest_expiration);
let _ = Pin::new(&mut timer).poll_next(cx);

self.closest_expiration = Some(timer);
}
Poll::Pending
Expand Down

0 comments on commit 89f898c

Please sign in to comment.