Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(swarm): eliminating protocol cloning when nothing is happening #5026

Open
wants to merge 44 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
40ff8d1
eliminating protocol cloning when nothing is happening
jakubDoka Dec 22, 2023
d8417ea
Merge branch 'master' into master
jakubDoka Dec 25, 2023
b9ffdae
switching to hashsets
jakubDoka Dec 25, 2023
6628dd4
adding benchmark
jakubDoka Dec 25, 2023
bacb93c
modifiing benchmark
jakubDoka Dec 25, 2023
98b2eb1
optimizing further
jakubDoka Dec 25, 2023
c271dbd
making the protocol name lengths more realiztic
jakubDoka Dec 25, 2023
d5edce0
Merge branch 'master' into master
jakubDoka Dec 25, 2023
b7ab499
Merge branch 'master' into master
jakubDoka Dec 28, 2023
5860200
reducing dif
jakubDoka Dec 28, 2023
adaaf7a
adding benchmark
jakubDoka Dec 28, 2023
016a3fa
appliing suggestions
jakubDoka Dec 29, 2023
a7ec976
Merge branch 'master' into master
jakubDoka Jan 2, 2024
3a53071
Merge branch 'master' into master
jakubDoka Jan 9, 2024
8ff2371
Merge branch 'master' into master
jakubDoka Jan 9, 2024
63a5891
Update swarm/src/handler.rs
jakubDoka Jan 17, 2024
bc72a1c
Update swarm/src/connection.rs
jakubDoka Jan 17, 2024
2cf3210
Merge branch 'master' into master
jakubDoka Jan 17, 2024
2e4061c
temp
jakubDoka Jan 17, 2024
18867e7
resolving more comments
jakubDoka Jan 17, 2024
651b298
Merge branch 'master' into master
jakubDoka Jan 21, 2024
5aff909
Merge branch 'master' into master
jakubDoka Jan 23, 2024
fcfa598
Merge branch 'master' into master
jakubDoka Apr 5, 2024
5d344c4
Update swarm/src/handler.rs
jakubDoka Apr 5, 2024
f96d187
Update swarm/src/handler.rs
jakubDoka Apr 5, 2024
8e597d2
Update swarm/src/handler.rs
jakubDoka Apr 5, 2024
0cee240
Update swarm/src/handler.rs
jakubDoka Apr 5, 2024
044bbde
Update swarm/src/handler.rs
jakubDoka Apr 5, 2024
69baf9c
Update swarm/src/handler.rs
jakubDoka Apr 5, 2024
d9c273a
refactored tests to make them more readable
jakubDoka Apr 5, 2024
c243595
reducing work done by connection handler to minimum
jakubDoka Apr 9, 2024
762f636
adjusting the load
jakubDoka Apr 9, 2024
537c658
accident
jakubDoka Apr 9, 2024
82d474d
cleaning up benchmark names
jakubDoka Apr 9, 2024
0c59e4d
Merge branch 'master' into master
jakubDoka Apr 10, 2024
82de4af
change in lock
jakubDoka Apr 10, 2024
d6c8fc0
changing the protocols to owned variant
jakubDoka Apr 11, 2024
019f3ca
renaming
jakubDoka Apr 11, 2024
a432ed1
Update swarm/benches/connection_handler.rs
jakubDoka Apr 20, 2024
24fc0ca
cleaning up benchmarks
jakubDoka Apr 20, 2024
cee332f
Merge branch 'master' into master
jakubDoka Apr 20, 2024
35964a8
fixes
jakubDoka Apr 20, 2024
d071400
Merge branch 'master' into master
jakubDoka Apr 27, 2024
913a694
Merge branch 'master' into master
jakubDoka Apr 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
229 changes: 155 additions & 74 deletions swarm/src/connection.rs
Expand Up @@ -32,7 +32,7 @@ pub use supported_protocols::SupportedProtocols;
use crate::handler::{
AddressChange, ConnectionEvent, ConnectionHandler, DialUpgradeError, FullyNegotiatedInbound,
FullyNegotiatedOutbound, ListenUpgradeError, ProtocolSupport, ProtocolsAdded, ProtocolsChange,
UpgradeInfoSend,
ProtocolsRemoved, UpgradeInfoSend,
};
use crate::stream::ActiveStreamCounter;
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend};
Expand All @@ -52,7 +52,7 @@ use libp2p_core::upgrade;
use libp2p_core::upgrade::{NegotiationError, ProtocolError};
use libp2p_core::Endpoint;
use libp2p_identity::PeerId;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -153,8 +153,11 @@ where
SubstreamRequested<THandler::OutboundOpenInfo, THandler::OutboundProtocol>,
>,

local_supported_protocols: HashSet<StreamProtocol>,
local_supported_protocols:
HashMap<AsStrHashEq<<THandler::InboundProtocol as UpgradeInfoSend>::Info>, bool>,
remote_supported_protocols: HashSet<StreamProtocol>,
temp_protocols: Vec<StreamProtocol>,
jakubDoka marked this conversation as resolved.
Show resolved Hide resolved

idle_timeout: Duration,
stream_counter: ActiveStreamCounter,
}
Expand Down Expand Up @@ -186,10 +189,22 @@ where
max_negotiating_inbound_streams: usize,
idle_timeout: Duration,
) -> Self {
let initial_protocols = gather_supported_protocols(&handler);
if !initial_protocols.is_empty() {
let local_supported_protocols = handler
.listen_protocol()
.upgrade()
.protocol_info()
.map(|i| (AsStrHashEq(i), true))
.collect::<HashMap<_, _>>();
jakubDoka marked this conversation as resolved.
Show resolved Hide resolved

if !local_supported_protocols.is_empty() {
let temp = local_supported_protocols
.keys()
.filter_map(|i| StreamProtocol::try_from_owned(i.0.as_ref().to_owned()).ok())
.collect::<Vec<_>>();
handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(
ProtocolsChange::Added(ProtocolsAdded::from_set(&initial_protocols)),
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
ProtocolsChange::Added(ProtocolsAdded {
protocols: temp.iter(),
}),
));
}
Connection {
Expand All @@ -201,8 +216,9 @@ where
substream_upgrade_protocol_override,
max_negotiating_inbound_streams,
requested_substreams: Default::default(),
local_supported_protocols: initial_protocols,
local_supported_protocols,
remote_supported_protocols: Default::default(),
temp_protocols: Default::default(),
idle_timeout,
stream_counter: ActiveStreamCounter::default(),
}
Expand Down Expand Up @@ -250,6 +266,7 @@ where
substream_upgrade_protocol_override,
local_supported_protocols: supported_protocols,
remote_supported_protocols,
temp_protocols,
idle_timeout,
stream_counter,
..
Expand Down Expand Up @@ -286,24 +303,35 @@ where
Poll::Ready(ConnectionHandlerEvent::ReportRemoteProtocols(
ProtocolSupport::Added(protocols),
)) => {
if let Some(added) =
ProtocolsChange::add(remote_supported_protocols, &protocols)
{
handler.on_connection_event(ConnectionEvent::RemoteProtocolsChange(added));
remote_supported_protocols.extend(protocols);
let added = protocols
.into_iter()
.filter(|i| remote_supported_protocols.insert(i.clone()))
jakubDoka marked this conversation as resolved.
Show resolved Hide resolved
.collect::<Vec<_>>();

if !added.is_empty() {
handler.on_connection_event(ConnectionEvent::RemoteProtocolsChange(
ProtocolsChange::Added(ProtocolsAdded {
protocols: added.iter(),
}),
));
}

continue;
}
Poll::Ready(ConnectionHandlerEvent::ReportRemoteProtocols(
ProtocolSupport::Removed(protocols),
)) => {
if let Some(removed) =
ProtocolsChange::remove(remote_supported_protocols, &protocols)
{
handler
.on_connection_event(ConnectionEvent::RemoteProtocolsChange(removed));
remote_supported_protocols.retain(|p| !protocols.contains(p));
let removed = protocols
.into_iter()
.filter_map(|i| remote_supported_protocols.take(&i))
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
.collect::<Vec<_>>();

if !removed.is_empty() {
handler.on_connection_event(ConnectionEvent::RemoteProtocolsChange(
ProtocolsChange::Removed(ProtocolsRemoved {
protocols: removed.iter(),
}),
));
}

continue;
Expand Down Expand Up @@ -431,17 +459,18 @@ where
}
}

let new_protocols = gather_supported_protocols(handler);
let changes = ProtocolsChange::from_full_sets(supported_protocols, &new_protocols);

if !changes.is_empty() {
for change in changes {
handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
}

*supported_protocols = new_protocols;

continue; // Go back to the top, handler can potentially make progress again.
jakubDoka marked this conversation as resolved.
Show resolved Hide resolved
let changes = ProtocolsChange::from_full_sets(
supported_protocols,
handler.listen_protocol().upgrade().protocol_info(),
temp_protocols,
);
let mut has_changes = false;
jakubDoka marked this conversation as resolved.
Show resolved Hide resolved
for change in changes {
handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
has_changes = true;
}
if has_changes {
continue;
}

return Poll::Pending; // Nothing can make progress, return `Pending`.
Expand All @@ -454,15 +483,6 @@ where
}
}

fn gather_supported_protocols(handler: &impl ConnectionHandler) -> HashSet<StreamProtocol> {
handler
.listen_protocol()
.upgrade()
.protocol_info()
.filter_map(|i| StreamProtocol::try_from_owned(i.as_ref().to_owned()).ok())
.collect()
}
jakubDoka marked this conversation as resolved.
Show resolved Hide resolved

fn compute_new_shutdown(
handler_keep_alive: bool,
current_shutdown: &Shutdown,
Expand Down Expand Up @@ -737,6 +757,58 @@ enum Shutdown {
Later(Delay, Instant),
}

/// The endpoint roles associated with a pending peer-to-peer connection.
jakubDoka marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum PendingPoint {
/// The socket comes from a dialer.
///
/// There is no single address associated with the Dialer of a pending
/// connection. Addresses are dialed in parallel. Only once the first dial
/// is successful is the address of the connection known.
Dialer {
/// Same as [`ConnectedPoint::Dialer`] `role_override`.
role_override: Endpoint,
},
/// The socket comes from a listener.
Listener {
/// Local connection address.
local_addr: Multiaddr,
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
},
}

impl From<ConnectedPoint> for PendingPoint {
fn from(endpoint: ConnectedPoint) -> Self {
match endpoint {
ConnectedPoint::Dialer { role_override, .. } => PendingPoint::Dialer { role_override },
ConnectedPoint::Listener {
local_addr,
send_back_addr,
} => PendingPoint::Listener {
local_addr,
send_back_addr,
},
}
}
}

pub(crate) struct AsStrHashEq<T>(pub(crate) T);

impl<T: AsRef<str>> Eq for AsStrHashEq<T> {}

impl<T: AsRef<str>> PartialEq for AsStrHashEq<T> {
fn eq(&self, other: &Self) -> bool {
self.0.as_ref() == other.0.as_ref()
}
}

impl<T: AsRef<str>> std::hash::Hash for AsStrHashEq<T> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.0.as_ref().hash(state)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -848,6 +920,51 @@ mod tests {
assert_eq!(connection.handler.local_removed, vec![vec!["/foo"]]);
}

#[test]
#[ignore]
fn repoll_with_active_protocols() {
fn run_benchmark(protcol_count: usize, iters: usize) {
jakubDoka marked this conversation as resolved.
Show resolved Hide resolved
// we need longer protocol names
macro_rules! protocols {
($($name:ident)*) => {
concat!(
$(
"/",
stringify!($name),
"ffffffffffffffffffffff ",
)*
)
}
}

const PROTOCOLS: &str = protocols!(a b c d e f g h i j k l m n o p q r s t u v);
let protocols = PROTOCOLS.split(' ').collect::<Vec<_>>();

let mut connection = Connection::new(
StreamMuxerBox::new(PendingStreamMuxer),
ConfigurableProtocolConnectionHandler::default(),
None,
0,
Duration::ZERO,
);

connection.handler.listen_on(&protocols[..protcol_count]);

let now = Instant::now();
for _ in 0..iters {
let _ = connection.poll_noop_waker();
}
let elapsed = now.elapsed();
println!("{protcol_count} {elapsed:?}");
}

let iters = 3_000_000;
run_benchmark(2, iters);
run_benchmark(4, iters);
run_benchmark(10, iters);
run_benchmark(20, iters);
jakubDoka marked this conversation as resolved.
Show resolved Hide resolved
}

#[test]
fn only_propagtes_actual_changes_to_remote_protocols_to_handler() {
let mut connection = Connection::new(
Expand Down Expand Up @@ -1315,39 +1432,3 @@ mod tests {
}
}
}

/// The endpoint roles associated with a pending peer-to-peer connection.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum PendingPoint {
/// The socket comes from a dialer.
///
/// There is no single address associated with the Dialer of a pending
/// connection. Addresses are dialed in parallel. Only once the first dial
/// is successful is the address of the connection known.
Dialer {
/// Same as [`ConnectedPoint::Dialer`] `role_override`.
role_override: Endpoint,
},
/// The socket comes from a listener.
Listener {
/// Local connection address.
local_addr: Multiaddr,
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
},
}

impl From<ConnectedPoint> for PendingPoint {
fn from(endpoint: ConnectedPoint) -> Self {
match endpoint {
ConnectedPoint::Dialer { role_override, .. } => PendingPoint::Dialer { role_override },
ConnectedPoint::Listener {
local_addr,
send_back_addr,
} => PendingPoint::Listener {
local_addr,
send_back_addr,
},
}
}
}
20 changes: 12 additions & 8 deletions swarm/src/connection/supported_protocols.rs
Expand Up @@ -40,7 +40,6 @@ impl SupportedProtocols {
mod tests {
use super::*;
use crate::handler::{ProtocolsAdded, ProtocolsRemoved};
use once_cell::sync::Lazy;

#[test]
fn protocols_change_added_returns_correct_changed_value() {
Expand Down Expand Up @@ -70,19 +69,24 @@ mod tests {
}

fn add_foo() -> ProtocolsChange<'static> {
ProtocolsChange::Added(ProtocolsAdded::from_set(&FOO_PROTOCOLS))
ProtocolsChange::Added(ProtocolsAdded {
protocols: FOO_PROTOCOLS.iter(),
})
}

fn add_foo_bar() -> ProtocolsChange<'static> {
ProtocolsChange::Added(ProtocolsAdded::from_set(&FOO_BAR_PROTOCOLS))
ProtocolsChange::Added(ProtocolsAdded {
protocols: FOO_BAR_PROTOCOLS.iter(),
})
}

fn remove_foo() -> ProtocolsChange<'static> {
ProtocolsChange::Removed(ProtocolsRemoved::from_set(&FOO_PROTOCOLS))
ProtocolsChange::Removed(ProtocolsRemoved {
protocols: FOO_PROTOCOLS.iter(),
})
}

static FOO_PROTOCOLS: Lazy<HashSet<StreamProtocol>> =
Lazy::new(|| HashSet::from([StreamProtocol::new("/foo")]));
static FOO_BAR_PROTOCOLS: Lazy<HashSet<StreamProtocol>> =
Lazy::new(|| HashSet::from([StreamProtocol::new("/foo"), StreamProtocol::new("/bar")]));
static FOO_PROTOCOLS: &[StreamProtocol] = &[StreamProtocol::new("/foo")];
static FOO_BAR_PROTOCOLS: &[StreamProtocol] =
&[StreamProtocol::new("/foo"), StreamProtocol::new("/bar")];
}