diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index fc4094f16ab..e276411af0e 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -52,6 +52,7 @@ libp2p-swarm-derive = { path = "../swarm-derive" } # Using `pat libp2p-swarm-test = { path = "../swarm-test" } # Using `path` here because this is a cyclic dev-dependency which otherwise breaks releasing. libp2p-yamux = { path = "../muxers/yamux" } # Using `path` here because this is a cyclic dev-dependency which otherwise breaks releasing. quickcheck = { workspace = true } +criterion = "0.5" void = "1" once_cell = "1.19.0" trybuild = "1.0.91" @@ -69,5 +70,9 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] rustc-args = ["--cfg", "docsrs"] +[[bench]] +name = "connection_handler" +harness = false + [lints] workspace = true diff --git a/swarm/benches/connection_handler.rs b/swarm/benches/connection_handler.rs new file mode 100644 index 00000000000..ac13fa09cfe --- /dev/null +++ b/swarm/benches/connection_handler.rs @@ -0,0 +1,259 @@ +use async_std::stream::StreamExt; +use criterion::{criterion_group, criterion_main, Criterion}; +use futures::FutureExt; +use instant::Duration; +use libp2p_core::{ + transport::MemoryTransport, InboundUpgrade, Multiaddr, OutboundUpgrade, Transport, UpgradeInfo, +}; +use libp2p_swarm::{ConnectionHandler, NetworkBehaviour, StreamProtocol}; +use std::convert::Infallible; + +criterion_main!(one_behavior_many_protocols); + +macro_rules! benchmark_one_behaviour_many_protocols { + ($( + $name:ident, + )*) => { + $( + #[tokio::main(flavor = "multi_thread", worker_threads = 1)] + async fn $name(c: &mut Criterion) { + let protocol_count = parse_counts(stringify!($name)); + bench_run_one_behaviour_many_protocols(c, protocol_count, 100000); + } + )* + + criterion_group!( + one_behavior_many_protocols, + $( + $name, + )* + ); + }; +} + +benchmark_one_behaviour_many_protocols! { + one_behavior_many_protocols_100, + one_behavior_many_protocols_1000, + one_behavior_many_protocols_10000, +} + +fn parse_counts(name: &str) -> usize { + name.split('_').next_back().unwrap().parse().unwrap() +} + +fn new_swarm(protocol_count: usize, spam_count: usize) -> libp2p_swarm::Swarm { + // we leak to simulate static protocols witch is the common case + let protocols = (0..protocol_count) + .map(|i| StreamProtocol::new(format!("/protocol/{i}").leak())) + .collect::>() + .leak(); + + let swarm_a_keypair = libp2p_identity::Keypair::generate_ed25519(); + libp2p_swarm::Swarm::new( + MemoryTransport::new() + .upgrade(multistream_select::Version::V1) + .authenticate(libp2p_plaintext::Config::new(&swarm_a_keypair)) + .multiplex(libp2p_yamux::Config::default()) + .boxed(), + PollerBehaviour { + spam_count, + protocols, + }, + swarm_a_keypair.public().to_peer_id(), + libp2p_swarm::Config::with_tokio_executor().with_idle_connection_timeout(Duration::MAX), + ) +} + +fn bench_run_one_behaviour_many_protocols( + c: &mut Criterion, + protocol_count: usize, + spam_count: usize, +) { + let mut sa = new_swarm(protocol_count, spam_count); + let mut sb = new_swarm(protocol_count, spam_count); + + static mut OFFSET: usize = 0; + let offset = unsafe { + OFFSET += 1; + OFFSET + }; + + sa.listen_on(format!("/memory/{offset}").parse().unwrap()) + .unwrap(); + sb.dial(format!("/memory/{offset}").parse::().unwrap()) + .unwrap(); + + c.bench_function(&format!("one_behavior_many_protocols_{protocol_count}_{spam_count}"), |b| { + b.iter(|| { + futures::executor::block_on(async { + let [mut af, mut bf] = [false; 2]; + while !af || !bf { + futures::select! { + event = sb.next().fuse() => { + bf |= matches!(event, Some(libp2p_swarm::SwarmEvent::Behaviour(FinishedSpamming))); + } + event = sa.next().fuse() => { + af |= matches!(event, Some(libp2p_swarm::SwarmEvent::Behaviour(FinishedSpamming))); + } + } + } + }); + }); + }); +} + +struct PollerBehaviour { + spam_count: usize, + protocols: &'static [StreamProtocol], +} + +#[derive(Debug)] +struct FinishedSpamming; + +impl NetworkBehaviour for PollerBehaviour { + type ConnectionHandler = PollerHandler; + type ToSwarm = FinishedSpamming; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + _peer: libp2p_identity::PeerId, + _local_addr: &libp2p_core::Multiaddr, + _remote_addr: &libp2p_core::Multiaddr, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(PollerHandler { + spam_count: 0, + protocols: self.protocols, + }) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + _peer: libp2p_identity::PeerId, + _addr: &libp2p_core::Multiaddr, + _role_override: libp2p_core::Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(PollerHandler { + spam_count: self.spam_count, + protocols: self.protocols, + }) + } + + fn on_swarm_event(&mut self, _: libp2p_swarm::FromSwarm) {} + + fn on_connection_handler_event( + &mut self, + _peer_id: libp2p_identity::PeerId, + _connection_id: libp2p_swarm::ConnectionId, + _event: libp2p_swarm::THandlerOutEvent, + ) { + self.spam_count = 0; + } + + fn poll( + &mut self, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll>> + { + if self.spam_count == 0 { + std::task::Poll::Ready(libp2p_swarm::ToSwarm::GenerateEvent(FinishedSpamming)) + } else { + std::task::Poll::Pending + } + } +} + +#[derive(Default)] +struct PollerHandler { + spam_count: usize, + protocols: &'static [StreamProtocol], +} + +impl ConnectionHandler for PollerHandler { + type FromBehaviour = Infallible; + + type ToBehaviour = FinishedSpamming; + + type InboundProtocol = Upgrade; + + type OutboundProtocol = Upgrade; + + type InboundOpenInfo = (); + + type OutboundOpenInfo = (); + + fn listen_protocol( + &self, + ) -> libp2p_swarm::SubstreamProtocol { + libp2p_swarm::SubstreamProtocol::new(Upgrade(self.protocols), ()) + } + + fn poll( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll< + libp2p_swarm::ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::ToBehaviour, + >, + > { + if self.spam_count != 0 { + self.spam_count -= 1; + cx.waker().wake_by_ref(); + return std::task::Poll::Pending; + } + + self.spam_count = usize::MAX; + std::task::Poll::Ready(libp2p_swarm::ConnectionHandlerEvent::NotifyBehaviour( + FinishedSpamming, + )) + } + + fn on_behaviour_event(&mut self, _event: Self::FromBehaviour) { + match _event {} + } + + fn on_connection_event( + &mut self, + _event: libp2p_swarm::handler::ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + } +} + +pub struct Upgrade(&'static [StreamProtocol]); + +impl UpgradeInfo for Upgrade { + type Info = &'static StreamProtocol; + type InfoIter = std::slice::Iter<'static, StreamProtocol>; + + fn protocol_info(&self) -> Self::InfoIter { + self.0.iter() + } +} + +impl OutboundUpgrade for Upgrade { + type Output = libp2p_swarm::Stream; + type Error = Infallible; + type Future = futures::future::Ready>; + + fn upgrade_outbound(self, s: libp2p_swarm::Stream, _: Self::Info) -> Self::Future { + futures::future::ready(Ok(s)) + } +} + +impl InboundUpgrade for Upgrade { + type Output = libp2p_swarm::Stream; + type Error = Infallible; + type Future = futures::future::Ready>; + + fn upgrade_inbound(self, s: libp2p_swarm::Stream, _: Self::Info) -> Self::Future { + futures::future::ready(Ok(s)) + } +} diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 50aa9f8a0fb..a840ab75204 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -31,8 +31,7 @@ pub use supported_protocols::SupportedProtocols; use crate::handler::{ AddressChange, ConnectionEvent, ConnectionHandler, DialUpgradeError, FullyNegotiatedInbound, - FullyNegotiatedOutbound, ListenUpgradeError, ProtocolSupport, ProtocolsAdded, ProtocolsChange, - UpgradeInfoSend, + FullyNegotiatedOutbound, ListenUpgradeError, ProtocolSupport, ProtocolsChange, UpgradeInfoSend, }; use crate::stream::ActiveStreamCounter; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 4e98281c23c..0d25c44d7b2 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -53,7 +53,7 @@ pub use map_out::MapOutEvent; pub use one_shot::{OneShotHandler, OneShotHandlerConfig}; pub use pending::PendingConnectionHandler; pub use select::ConnectionHandlerSelect; -use smallvec::{smallvec, SmallVec}; +use smallvec::SmallVec; use crate::StreamProtocol; use core::slice; @@ -731,7 +731,7 @@ where mod test { use super::*; - fn preprocess(s: &'static str) -> HashSet { + fn protocol_set_of(s: &'static str) -> HashSet { s.split_whitespace() .map(|p| StreamProtocol::try_from_owned(format!("/{p}")).unwrap()) .collect() @@ -752,57 +752,57 @@ mod test { #[test] fn test_protocol_remove_subset() { - let mut existing = preprocess("a b c"); - let to_remove = preprocess("a b"); + let mut existing = protocol_set_of("a b c"); + let to_remove = protocol_set_of("a b"); let change = test_remove(&mut existing, to_remove); - assert_eq!(existing, preprocess("c")); - assert_eq!(change, preprocess("a b")); + assert_eq!(existing, protocol_set_of("c")); + assert_eq!(change, protocol_set_of("a b")); } #[test] fn test_protocol_remove_all() { - let mut existing = preprocess("a b c"); - let to_remove = preprocess("a b c"); + let mut existing = protocol_set_of("a b c"); + let to_remove = protocol_set_of("a b c"); let change = test_remove(&mut existing, to_remove); - assert_eq!(existing, preprocess("")); - assert_eq!(change, preprocess("a b c")); + assert_eq!(existing, protocol_set_of("")); + assert_eq!(change, protocol_set_of("a b c")); } #[test] fn test_protocol_remove_superset() { - let mut existing = preprocess("a b c"); - let to_remove = preprocess("a b c d"); + let mut existing = protocol_set_of("a b c"); + let to_remove = protocol_set_of("a b c d"); let change = test_remove(&mut existing, to_remove); - assert_eq!(existing, preprocess("")); - assert_eq!(change, preprocess("a b c")); + assert_eq!(existing, protocol_set_of("")); + assert_eq!(change, protocol_set_of("a b c")); } #[test] fn test_protocol_remove_none() { - let mut existing = preprocess("a b c"); - let to_remove = preprocess("d"); + let mut existing = protocol_set_of("a b c"); + let to_remove = protocol_set_of("d"); let change = test_remove(&mut existing, to_remove); - assert_eq!(existing, preprocess("a b c")); - assert_eq!(change, preprocess("")); + assert_eq!(existing, protocol_set_of("a b c")); + assert_eq!(change, protocol_set_of("")); } #[test] fn test_protocol_remove_none_from_empty() { - let mut existing = preprocess(""); - let to_remove = preprocess("d"); + let mut existing = protocol_set_of(""); + let to_remove = protocol_set_of("d"); let change = test_remove(&mut existing, to_remove); - assert_eq!(existing, preprocess("")); - assert_eq!(change, preprocess("")); + assert_eq!(existing, protocol_set_of("")); + assert_eq!(change, protocol_set_of("")); } fn test_from_full_sets( @@ -840,56 +840,56 @@ mod test { #[test] fn test_from_full_stes_subset() { - let existing = preprocess("a b c"); - let new = preprocess("a b"); + let existing = protocol_set_of("a b c"); + let new = protocol_set_of("a b"); let [removed_changes, added_changes] = test_from_full_sets(existing, new); - assert_eq!(added_changes, preprocess("")); - assert_eq!(removed_changes, preprocess("c")); + assert_eq!(added_changes, protocol_set_of("")); + assert_eq!(removed_changes, protocol_set_of("c")); } #[test] fn test_from_full_sets_superset() { - let existing = preprocess("a b"); - let new = preprocess("a b c"); + let existing = protocol_set_of("a b"); + let new = protocol_set_of("a b c"); let [removed_changes, added_changes] = test_from_full_sets(existing, new); - assert_eq!(added_changes, preprocess("c")); - assert_eq!(removed_changes, preprocess("")); + assert_eq!(added_changes, protocol_set_of("c")); + assert_eq!(removed_changes, protocol_set_of("")); } #[test] fn test_from_full_sets_intersection() { - let existing = preprocess("a b c"); - let new = preprocess("b c d"); + let existing = protocol_set_of("a b c"); + let new = protocol_set_of("b c d"); let [removed_changes, added_changes] = test_from_full_sets(existing, new); - assert_eq!(added_changes, preprocess("d")); - assert_eq!(removed_changes, preprocess("a")); + assert_eq!(added_changes, protocol_set_of("d")); + assert_eq!(removed_changes, protocol_set_of("a")); } #[test] fn test_from_full_sets_disjoint() { - let existing = preprocess("a b c"); - let new = preprocess("d e f"); + let existing = protocol_set_of("a b c"); + let new = protocol_set_of("d e f"); let [removed_changes, added_changes] = test_from_full_sets(existing, new); - assert_eq!(added_changes, preprocess("d e f")); - assert_eq!(removed_changes, preprocess("a b c")); + assert_eq!(added_changes, protocol_set_of("d e f")); + assert_eq!(removed_changes, protocol_set_of("a b c")); } #[test] fn test_from_full_sets_empty() { - let existing = preprocess(""); - let new = preprocess(""); + let existing = protocol_set_of(""); + let new = protocol_set_of(""); let [removed_changes, added_changes] = test_from_full_sets(existing, new); - assert_eq!(added_changes, preprocess("")); - assert_eq!(removed_changes, preprocess("")); + assert_eq!(added_changes, protocol_set_of("")); + assert_eq!(removed_changes, protocol_set_of("")); } }