Skip to content

Commit

Permalink
swarm/: Limit negotiating inbound substreams per connection (#2697)
Browse files Browse the repository at this point in the history
This limit is shared across all `ConnectionHandler`s on a single connection. It
only enforces a limit on the number of negotiating substreams. Once negotiated a
`ConnectionHandler` manages the lifecycle of the substream and has to enforce
limits themselves.
  • Loading branch information
mxinden committed Jun 8, 2022
1 parent 59a74b4 commit 2acbb45
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 2 deletions.
6 changes: 6 additions & 0 deletions swarm/CHANGELOG.md
@@ -1,3 +1,9 @@
# 0.36.1 - unreleased

- Limit negotiating inbound substreams per connection. See [PR 2697].

[PR 2697]: https://github.com/libp2p/rust-libp2p/pull/2697

# 0.36.0

- Don't require `Transport` to be `Clone`. See [PR 2529].
Expand Down
2 changes: 1 addition & 1 deletion swarm/Cargo.toml
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-swarm"
edition = "2021"
rust-version = "1.56.1"
description = "The libp2p swarm"
version = "0.36.0"
version = "0.36.1"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
7 changes: 6 additions & 1 deletion swarm/src/connection.rs
Expand Up @@ -97,8 +97,13 @@ where
muxer: StreamMuxerBox,
handler: THandler,
substream_upgrade_protocol_override: Option<upgrade::Version>,
max_negotiating_inbound_streams: usize,
) -> Self {
let wrapped_handler = HandlerWrapper::new(handler, substream_upgrade_protocol_override);
let wrapped_handler = HandlerWrapper::new(
handler,
substream_upgrade_protocol_override,
max_negotiating_inbound_streams,
);
Connection {
muxing: Muxing::new(muxer),
handler: wrapped_handler,
Expand Down
20 changes: 20 additions & 0 deletions swarm/src/connection/handler_wrapper.rs
Expand Up @@ -77,6 +77,16 @@ where
shutdown: Shutdown,
/// The substream upgrade protocol override, if any.
substream_upgrade_protocol_override: Option<upgrade::Version>,
/// The maximum number of inbound streams concurrently negotiating on a
/// connection. New inbound streams exceeding the limit are dropped and thus
/// reset.
///
/// Note: This only enforces a limit on the number of concurrently
/// negotiating inbound streams. The total number of inbound streams on a
/// connection is the sum of negotiating and negotiated streams. A limit on
/// the total number of streams can be enforced at the [`StreamMuxerBox`]
/// level.
max_negotiating_inbound_streams: usize,
}

impl<TConnectionHandler: ConnectionHandler> std::fmt::Debug for HandlerWrapper<TConnectionHandler> {
Expand All @@ -98,6 +108,7 @@ impl<TConnectionHandler: ConnectionHandler> HandlerWrapper<TConnectionHandler> {
pub(crate) fn new(
handler: TConnectionHandler,
substream_upgrade_protocol_override: Option<upgrade::Version>,
max_negotiating_inbound_streams: usize,
) -> Self {
Self {
handler,
Expand All @@ -107,6 +118,7 @@ impl<TConnectionHandler: ConnectionHandler> HandlerWrapper<TConnectionHandler> {
unique_dial_upgrade_id: 0,
shutdown: Shutdown::None,
substream_upgrade_protocol_override,
max_negotiating_inbound_streams,
}
}

Expand Down Expand Up @@ -243,6 +255,14 @@ where
) {
match endpoint {
SubstreamEndpoint::Listener => {
if self.negotiating_in.len() == self.max_negotiating_inbound_streams {
log::warn!(
"Incoming substream exceeding maximum number of \
negotiating inbound streams. Dropping."
);
return;
}

let protocol = self.handler.listen_protocol();
let timeout = *protocol.timeout();
let (upgrade, user_data) = protocol.into_upgrade();
Expand Down
21 changes: 21 additions & 0 deletions swarm/src/connection/pool.rs
Expand Up @@ -87,6 +87,11 @@ where
/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,

/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [`super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams`].
max_negotiating_inbound_streams: usize,

/// The executor to use for running the background tasks. If `None`,
/// the tasks are kept in `local_spawns` instead and polled on the
/// current thread when the [`Pool`] is polled for new events.
Expand Down Expand Up @@ -263,6 +268,7 @@ where
task_command_buffer_size: config.task_command_buffer_size,
dial_concurrency_factor: config.dial_concurrency_factor,
substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
executor: config.executor,
local_spawns: FuturesUnordered::new(),
pending_connection_events_tx,
Expand Down Expand Up @@ -744,6 +750,7 @@ where
muxer,
handler.into_handler(&obtained_peer_id, &endpoint),
self.substream_upgrade_protocol_override,
self.max_negotiating_inbound_streams,
);
self.spawn(
task::new_for_established_connection(
Expand Down Expand Up @@ -1153,6 +1160,11 @@ pub struct PoolConfig {

/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,

/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams].
max_negotiating_inbound_streams: usize,
}

impl Default for PoolConfig {
Expand All @@ -1164,6 +1176,7 @@ impl Default for PoolConfig {
// By default, addresses of a single connection attempt are dialed in sequence.
dial_concurrency_factor: NonZeroU8::new(1).expect("1 > 0"),
substream_upgrade_protocol_override: None,
max_negotiating_inbound_streams: 128,
}
}
}
Expand Down Expand Up @@ -1222,6 +1235,14 @@ impl PoolConfig {
self.substream_upgrade_protocol_override = Some(v);
self
}

/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [`super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams`].
pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
self.max_negotiating_inbound_streams = v;
self
}
}

trait EntryExt<'a, K, V> {
Expand Down
8 changes: 8 additions & 0 deletions swarm/src/lib.rs
Expand Up @@ -1364,6 +1364,14 @@ where
self
}

/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [`PoolConfig::with_max_negotiating_inbound_streams`].
pub fn max_negotiating_inbound_streams(mut self, v: usize) -> Self {
self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
self
}

/// Builds a `Swarm` with the current configuration.
pub fn build(mut self) -> Swarm<TBehaviour> {
let supported_protocols = self
Expand Down

0 comments on commit 2acbb45

Please sign in to comment.