Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/mdns: Allow users to choose between async-io and tokio runtime #2748

Merged
merged 30 commits into from Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9a605b6
ci(*): testing workflow
gallegogt Aug 24, 2022
002a533
ci(*): update workflow name
gallegogt Aug 24, 2022
c75f3c2
Permit use mdns with Tokio or AsyncIO runtime
gallegogt Sep 1, 2022
10baffe
test(mdns): renamed mdns tests
gallegogt Jul 7, 2022
a330300
fix(protocols/mdns): fix high cpu usage with tokio library
gallegogt Jul 7, 2022
084a665
fix(examples/chat-tokio):update mdns-tokio feature
gallegogt Jul 7, 2022
4ee1c70
chore(protocols/mdns): update changelog
gallegogt Jul 7, 2022
1f19654
fix(protocols/mdns):permit build with all features
gallegogt Jul 9, 2022
328a186
chore(protocols/mdns): update changelog
gallegogt Jul 9, 2022
1898a32
style(chat-tokio): fix formating error
gallegogt Jul 14, 2022
57bd2db
chore(protocols/mdns): use deps syntax
gallegogt Jul 14, 2022
cfb9a9e
fix(protocols/mdns): clean the code
gallegogt Jul 14, 2022
c57ee57
fix(examples/chat-tokio): fix typo
gallegogt Jul 15, 2022
f20dbbe
chore(protocols/mdns): add pr link to changelog
gallegogt Jul 15, 2022
6e1c8e5
chore(protocols/mdns): fix changelog
gallegogt Jul 17, 2022
beeb1a1
fix(protocols/mdns): rename socket functions
gallegogt Jul 17, 2022
40c8485
fix(protocols/mdns): rename TimerBuilder trait
gallegogt Jul 17, 2022
f57a375
fix(protocols/mdns): add cfg_attr docsr
gallegogt Jul 17, 2022
c1de5bc
fix(protocols/mdns): update mdns tests
gallegogt Jul 17, 2022
5b21637
chore(protocols/mdns): add fmt
gallegogt Jul 17, 2022
c142bde
fix(protocols/mdns): update socket and timer impl
gallegogt Jul 18, 2022
f80f93e
fix(protocols/mdns): fix implementation logic and style
gallegogt Jul 18, 2022
e19f2c3
doc(protocols/mdns): fix documentation
gallegogt Jul 19, 2022
01643bb
fix(protocols/mdns): fix typo and logic
gallegogt Jul 20, 2022
92e219a
fix(protocols/mdns): sockets pinned on poll
gallegogt Jul 27, 2022
d6045ca
fix(protocols/mdns): change tokio interval missed tick behavior
gallegogt Jul 28, 2022
393c19f
refactor(mdns/test): update tokio expiration test
gallegogt Aug 22, 2022
d7e798c
fix(protocol/mdns): change interval behavior for tokio timer impl
gallegogt Aug 24, 2022
25d9b19
chore(workflow): remove testing workflow
gallegogt Sep 1, 2022
4e44dc9
protocols/mdns: Update changelog
mxinden Sep 2, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.toml
Expand Up @@ -19,7 +19,7 @@ default = [
"identify",
"kad",
"gossipsub",
"mdns",
"mdns-async-io",
"mplex",
"noise",
"ping",
Expand All @@ -46,7 +46,8 @@ identify = ["dep:libp2p-identify", "libp2p-metrics?/identify"]
kad = ["dep:libp2p-kad", "libp2p-metrics?/kad"]
gossipsub = ["dep:libp2p-gossipsub", "libp2p-metrics?/gossipsub"]
metrics = ["dep:libp2p-metrics"]
mdns = ["dep:libp2p-mdns"]
mdns-async-io = ["dep:libp2p-mdns", "libp2p-mdns?/async-io"]
mdns-tokio = ["dep:libp2p-mdns", "libp2p-mdns?/tokio"]
mplex = ["dep:libp2p-mplex"]
noise = ["dep:libp2p-noise"]
ping = ["dep:libp2p-ping", "libp2p-metrics?/ping"]
Expand Down Expand Up @@ -106,7 +107,7 @@ smallvec = "1.6.1"
[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.35.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.35.0", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.40.0", path = "protocols/mdns", optional = true }
libp2p-mdns = { version = "0.40.0", path = "protocols/mdns", optional = true, default-features = false }
libp2p-tcp = { version = "0.35.0", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.37.0", path = "transports/websocket", optional = true }

Expand Down Expand Up @@ -160,7 +161,7 @@ required-features = ["floodsub"]

[[example]]
name = "chat-tokio"
required-features = ["tcp-tokio", "mdns"]
required-features = ["tcp-tokio", "mdns-tokio"]

[[example]]
name = "file-sharing"
Expand Down
14 changes: 9 additions & 5 deletions examples/chat-tokio.rs
Expand Up @@ -25,23 +25,27 @@
//! The example is run per node as follows:
//!
//! ```sh
//! cargo run --example chat-tokio --features="tcp-tokio mdns"
//! cargo run --example chat-tokio --features="tcp-tokio mdns-tokio"
//! ```
//!
//! Alternatively, to run with the minimal set of features and crates:
//!
//! ```sh
//!cargo run --example chat-tokio \\
//! --no-default-features \\
//! --features="floodsub mplex noise tcp-tokio mdns"
//! --features="floodsub mplex noise tcp-tokio mdns-tokio"
gallegogt marked this conversation as resolved.
Show resolved Hide resolved
//! ```

use futures::StreamExt;
use libp2p::{
core::upgrade,
floodsub::{self, Floodsub, FloodsubEvent},
identity,
mdns::{Mdns, MdnsEvent},
mdns::{
MdnsEvent,
// `TokioMdns` is available through the `mdns-tokio` feature.
TokioMdns,
},
mplex,
noise,
swarm::{SwarmBuilder, SwarmEvent},
Expand Down Expand Up @@ -88,7 +92,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
#[behaviour(out_event = "MyBehaviourEvent")]
struct MyBehaviour {
floodsub: Floodsub,
mdns: Mdns,
mdns: TokioMdns,
}

#[allow(clippy::large_enum_variant)]
Expand All @@ -111,7 +115,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

// Create a Swarm to manage peers and events.
let mut swarm = {
let mdns = Mdns::new(Default::default()).await?;
let mdns = TokioMdns::new(Default::default()).await?;
let mut behaviour = MyBehaviour {
floodsub: Floodsub::new(peer_id),
mdns,
Expand Down
8 changes: 8 additions & 0 deletions protocols/mdns/CHANGELOG.md
Expand Up @@ -2,6 +2,14 @@

- Update to `libp2p-swarm` `v0.39.0`.

- Allow users to choose between async-io and tokio runtime
in the mdns protocol implementation. `async-io` is a default
feature, with an additional `tokio` feature (see [PR 2748])

- Fix high CPU usage with Tokio library (see [PR 2748]).
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

[PR 2748]: https://github.com/libp2p/rust-libp2p/pull/2748

# 0.39.0

- Update to `libp2p-swarm` `v0.38.0`.
Expand Down
23 changes: 20 additions & 3 deletions protocols/mdns/Cargo.toml
Expand Up @@ -11,7 +11,6 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
async-io = "1.3.1"
data-encoding = "2.3.2"
dns-parser = "0.8.0"
futures = "0.3.13"
Expand All @@ -25,8 +24,26 @@ smallvec = "1.6.1"
socket2 = { version = "0.4.0", features = ["all"] }
void = "1.0.2"

async-io = { version = "1.3.1", optional = true }
tokio = { version = "1.19", default-features = false, features = ["net", "time"], optional = true}

[features]
default = ["async-io"]
tokio = ["dep:tokio"]
async-io = ["dep:async-io"]

[dev-dependencies]
async-std = { version = "1.9.0", features = ["attributes"] }
env_logger = "0.9.0"
libp2p = { path = "../..", default-features = false, features = ["mdns", "tcp-async-io", "dns-async-std", "websocket", "noise", "mplex", "yamux"] }
tokio = { version = "1.15", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] }
libp2p = { path = "../..", default-features = false, features = ["mdns-async-io", "tcp-async-io", "dns-async-std", "tcp-tokio", "dns-tokio", "websocket", "noise", "mplex", "yamux"] }
tokio = { version = "1.19", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] }


[[test]]
name = "use-async-std"
required-features = ["async-io"]

[[test]]
name = "use-tokio"
required-features = ["tokio"]

43 changes: 34 additions & 9 deletions protocols/mdns/src/behaviour.rs
Expand Up @@ -19,11 +19,14 @@
// DEALINGS IN THE SOFTWARE.

mod iface;
mod socket;
mod timer;

use self::iface::InterfaceState;
use crate::behaviour::{socket::AsyncSocket, timer::Builder};
use crate::MdnsConfig;
use async_io::Timer;
use futures::prelude::*;
use futures::Stream;
use if_watch::{IfEvent, IfWatcher};
use libp2p_core::transport::ListenerId;
use libp2p_core::{Multiaddr, PeerId};
Expand All @@ -35,18 +38,32 @@ use smallvec::SmallVec;
use std::collections::hash_map::{Entry, HashMap};
use std::{cmp, fmt, io, net::IpAddr, pin::Pin, task::Context, task::Poll, time::Instant};

#[cfg(feature = "async-io")]
use crate::behaviour::{socket::asio::AsyncUdpSocket, timer::asio::AsyncTimer};

/// The type of a [`GenMdns`] using the `async-io` implementation.
#[cfg(feature = "async-io")]
pub type Mdns = GenMdns<AsyncUdpSocket, AsyncTimer>;

#[cfg(feature = "tokio")]
use crate::behaviour::{socket::tokio::TokioUdpSocket, timer::tokio::TokioTimer};

/// The type of a [`GenMdns`] using the `tokio` implementation.
#[cfg(feature = "tokio")]
pub type TokioMdns = GenMdns<TokioUdpSocket, TokioTimer>;

/// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds
/// them to the topology.
#[derive(Debug)]
pub struct Mdns {
pub struct GenMdns<S, T> {
/// InterfaceState config.
config: MdnsConfig,

/// Iface watcher.
if_watch: IfWatcher,

/// Mdns interface states.
iface_states: HashMap<IpAddr, InterfaceState>,
iface_states: HashMap<IpAddr, InterfaceState<S, T>>,

/// List of nodes that we have discovered, the address, and when their TTL expires.
///
Expand All @@ -57,10 +74,13 @@ pub struct Mdns {
/// Future that fires when the TTL of at least one node in `discovered_nodes` expires.
///
/// `None` if `discovered_nodes` is empty.
closest_expiration: Option<Timer>,
closest_expiration: Option<T>,
}

impl Mdns {
impl<S, T> GenMdns<S, T>
where
T: Builder,
{
/// Builds a new `Mdns` behaviour.
pub async fn new(config: MdnsConfig) -> io::Result<Self> {
let if_watch = if_watch::IfWatcher::new().await?;
Expand Down Expand Up @@ -91,11 +111,15 @@ impl Mdns {
*expires = now;
}
}
self.closest_expiration = Some(Timer::at(now));
self.closest_expiration = Some(T::at(now));
}
}

impl NetworkBehaviour for Mdns {
impl<S, T> NetworkBehaviour for GenMdns<S, T>
where
T: Builder + Stream,
S: AsyncSocket,
{
type ConnectionHandler = DummyConnectionHandler;
type OutEvent = MdnsEvent;

Expand Down Expand Up @@ -219,8 +243,9 @@ impl NetworkBehaviour for Mdns {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
if let Some(closest_expiration) = closest_expiration {
let mut timer = Timer::at(closest_expiration);
let _ = Pin::new(&mut timer).poll(cx);
let mut timer = T::at(closest_expiration);
let _ = Pin::new(&mut timer).poll_next(cx);

self.closest_expiration = Some(timer);
}
Poll::Pending
Expand Down