Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/: Limit negotiating inbound substreams per connection #2697

Merged
merged 8 commits into from Jun 8, 2022
4 changes: 4 additions & 0 deletions swarm/CHANGELOG.md
@@ -1,3 +1,7 @@
# 0.36.1 - unreleased

- Limit negotiating inbound substreams per connection.
mxinden marked this conversation as resolved.
Show resolved Hide resolved

# 0.36.0

- Don't require `Transport` to be `Clone`. See [PR 2529].
Expand Down
2 changes: 1 addition & 1 deletion swarm/Cargo.toml
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-swarm"
edition = "2021"
rust-version = "1.56.1"
description = "The libp2p swarm"
version = "0.36.0"
version = "0.36.1"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
7 changes: 6 additions & 1 deletion swarm/src/connection.rs
Expand Up @@ -97,8 +97,13 @@ where
muxer: StreamMuxerBox,
handler: THandler,
substream_upgrade_protocol_override: Option<upgrade::Version>,
max_negotiating_inbound_streams: usize,
) -> Self {
let wrapped_handler = HandlerWrapper::new(handler, substream_upgrade_protocol_override);
let wrapped_handler = HandlerWrapper::new(
handler,
substream_upgrade_protocol_override,
max_negotiating_inbound_streams,
);
Connection {
muxing: Muxing::new(muxer),
handler: wrapped_handler,
Expand Down
20 changes: 20 additions & 0 deletions swarm/src/connection/handler_wrapper.rs
Expand Up @@ -77,6 +77,16 @@ where
shutdown: Shutdown,
/// The substream upgrade protocol override, if any.
substream_upgrade_protocol_override: Option<upgrade::Version>,
/// The maximum number of inbound streams concurrently negotiating on a
/// connection. New inbound streams exceeding the limit are dropped and thus
/// reset.
///
/// Note: This only enforces a limit on the number of concurrently
/// negotiating inbound streams. The total number of inbound streams on a
/// connection is the sum of negotiating and negotiated streams. A limit on
/// the total number of streams can be enforced at the [`StreamMuxerBox`]
/// level.
max_negotiating_inbound_streams: usize,
}

impl<TConnectionHandler: ConnectionHandler> std::fmt::Debug for HandlerWrapper<TConnectionHandler> {
Expand All @@ -98,6 +108,7 @@ impl<TConnectionHandler: ConnectionHandler> HandlerWrapper<TConnectionHandler> {
pub(crate) fn new(
handler: TConnectionHandler,
substream_upgrade_protocol_override: Option<upgrade::Version>,
max_negotiating_inbound_streams: usize,
) -> Self {
Self {
handler,
Expand All @@ -107,6 +118,7 @@ impl<TConnectionHandler: ConnectionHandler> HandlerWrapper<TConnectionHandler> {
unique_dial_upgrade_id: 0,
shutdown: Shutdown::None,
substream_upgrade_protocol_override,
max_negotiating_inbound_streams,
}
}

Expand Down Expand Up @@ -243,6 +255,14 @@ where
) {
match endpoint {
SubstreamEndpoint::Listener => {
if self.negotiating_in.len() == self.max_negotiating_inbound_streams {
log::warn!(
"Incoming substream exceeding maximum number of \
negotiating inbound streams. Dropping."
);
return;
}

let protocol = self.handler.listen_protocol();
let timeout = *protocol.timeout();
let (upgrade, user_data) = protocol.into_upgrade();
Expand Down
21 changes: 21 additions & 0 deletions swarm/src/connection/pool.rs
Expand Up @@ -87,6 +87,11 @@ where
/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,

/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [`super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams`].
max_negotiating_inbound_streams: usize,

/// The executor to use for running the background tasks. If `None`,
/// the tasks are kept in `local_spawns` instead and polled on the
/// current thread when the [`Pool`] is polled for new events.
Expand Down Expand Up @@ -263,6 +268,7 @@ where
task_command_buffer_size: config.task_command_buffer_size,
dial_concurrency_factor: config.dial_concurrency_factor,
substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
executor: config.executor,
local_spawns: FuturesUnordered::new(),
pending_connection_events_tx,
Expand Down Expand Up @@ -744,6 +750,7 @@ where
muxer,
handler.into_handler(&obtained_peer_id, &endpoint),
self.substream_upgrade_protocol_override,
self.max_negotiating_inbound_streams,
);
self.spawn(
task::new_for_established_connection(
Expand Down Expand Up @@ -1153,6 +1160,11 @@ pub struct PoolConfig {

/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,

/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams].
max_negotiating_inbound_streams: usize,
}

impl Default for PoolConfig {
Expand All @@ -1164,6 +1176,7 @@ impl Default for PoolConfig {
// By default, addresses of a single connection attempt are dialed in sequence.
dial_concurrency_factor: NonZeroU8::new(1).expect("1 > 0"),
substream_upgrade_protocol_override: None,
max_negotiating_inbound_streams: 128,
}
}
}
Expand Down Expand Up @@ -1222,6 +1235,14 @@ impl PoolConfig {
self.substream_upgrade_protocol_override = Some(v);
self
}

/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [`super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams`].
pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
self.max_negotiating_inbound_streams = v;
self
}
}

trait EntryExt<'a, K, V> {
Expand Down
8 changes: 8 additions & 0 deletions swarm/src/lib.rs
Expand Up @@ -1364,6 +1364,14 @@ where
self
}

/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [`PoolConfig::with_max_negotiating_inbound_streams`].
pub fn max_negotiating_inbound_streams(mut self, v: usize) -> Self {
self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
self
}

/// Builds a `Swarm` with the current configuration.
pub fn build(mut self) -> Swarm<TBehaviour> {
let supported_protocols = self
Expand Down