From 2acbb457cde137cb3a1f4fb2a8b8556e2f2a86f4 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 8 Jun 2022 11:48:46 +0200 Subject: [PATCH] swarm/: Limit negotiating inbound substreams per connection (#2697) This limit is shared across all `ConnectionHandler`s on a single connection. It only enforces a limit on the number of negotiating substreams. Once negotiated a `ConnectionHandler` manages the lifecycle of the substream and has to enforce limits themselves. --- swarm/CHANGELOG.md | 6 ++++++ swarm/Cargo.toml | 2 +- swarm/src/connection.rs | 7 ++++++- swarm/src/connection/handler_wrapper.rs | 20 ++++++++++++++++++++ swarm/src/connection/pool.rs | 21 +++++++++++++++++++++ swarm/src/lib.rs | 8 ++++++++ 6 files changed, 62 insertions(+), 2 deletions(-) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index f44b84d52b8..6dcfdb7be92 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.36.1 - unreleased + +- Limit negotiating inbound substreams per connection. See [PR 2697]. + +[PR 2697]: https://github.com/libp2p/rust-libp2p/pull/2697 + # 0.36.0 - Don't require `Transport` to be `Clone`. See [PR 2529]. diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 446b18b111b..ddd8c82c85b 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-swarm" edition = "2021" rust-version = "1.56.1" description = "The libp2p swarm" -version = "0.36.0" +version = "0.36.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 42c29239cdc..f55f0367b8c 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -97,8 +97,13 @@ where muxer: StreamMuxerBox, handler: THandler, substream_upgrade_protocol_override: Option, + max_negotiating_inbound_streams: usize, ) -> Self { - let wrapped_handler = HandlerWrapper::new(handler, substream_upgrade_protocol_override); + let wrapped_handler = HandlerWrapper::new( + handler, + substream_upgrade_protocol_override, + max_negotiating_inbound_streams, + ); Connection { muxing: Muxing::new(muxer), handler: wrapped_handler, diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index b10077d41b3..05e389d6134 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -77,6 +77,16 @@ where shutdown: Shutdown, /// The substream upgrade protocol override, if any. substream_upgrade_protocol_override: Option, + /// The maximum number of inbound streams concurrently negotiating on a + /// connection. New inbound streams exceeding the limit are dropped and thus + /// reset. + /// + /// Note: This only enforces a limit on the number of concurrently + /// negotiating inbound streams. The total number of inbound streams on a + /// connection is the sum of negotiating and negotiated streams. A limit on + /// the total number of streams can be enforced at the [`StreamMuxerBox`] + /// level. + max_negotiating_inbound_streams: usize, } impl std::fmt::Debug for HandlerWrapper { @@ -98,6 +108,7 @@ impl HandlerWrapper { pub(crate) fn new( handler: TConnectionHandler, substream_upgrade_protocol_override: Option, + max_negotiating_inbound_streams: usize, ) -> Self { Self { handler, @@ -107,6 +118,7 @@ impl HandlerWrapper { unique_dial_upgrade_id: 0, shutdown: Shutdown::None, substream_upgrade_protocol_override, + max_negotiating_inbound_streams, } } @@ -243,6 +255,14 @@ where ) { match endpoint { SubstreamEndpoint::Listener => { + if self.negotiating_in.len() == self.max_negotiating_inbound_streams { + log::warn!( + "Incoming substream exceeding maximum number of \ + negotiating inbound streams. Dropping." + ); + return; + } + let protocol = self.handler.listen_protocol(); let timeout = *protocol.timeout(); let (upgrade, user_data) = protocol.into_upgrade(); diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 8186154c9b3..a8f28beeadc 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -87,6 +87,11 @@ where /// The configured override for substream protocol upgrades, if any. substream_upgrade_protocol_override: Option, + /// The maximum number of inbound streams concurrently negotiating on a connection. + /// + /// See [`super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams`]. + max_negotiating_inbound_streams: usize, + /// The executor to use for running the background tasks. If `None`, /// the tasks are kept in `local_spawns` instead and polled on the /// current thread when the [`Pool`] is polled for new events. @@ -263,6 +268,7 @@ where task_command_buffer_size: config.task_command_buffer_size, dial_concurrency_factor: config.dial_concurrency_factor, substream_upgrade_protocol_override: config.substream_upgrade_protocol_override, + max_negotiating_inbound_streams: config.max_negotiating_inbound_streams, executor: config.executor, local_spawns: FuturesUnordered::new(), pending_connection_events_tx, @@ -744,6 +750,7 @@ where muxer, handler.into_handler(&obtained_peer_id, &endpoint), self.substream_upgrade_protocol_override, + self.max_negotiating_inbound_streams, ); self.spawn( task::new_for_established_connection( @@ -1153,6 +1160,11 @@ pub struct PoolConfig { /// The configured override for substream protocol upgrades, if any. substream_upgrade_protocol_override: Option, + + /// The maximum number of inbound streams concurrently negotiating on a connection. + /// + /// See [super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams]. + max_negotiating_inbound_streams: usize, } impl Default for PoolConfig { @@ -1164,6 +1176,7 @@ impl Default for PoolConfig { // By default, addresses of a single connection attempt are dialed in sequence. dial_concurrency_factor: NonZeroU8::new(1).expect("1 > 0"), substream_upgrade_protocol_override: None, + max_negotiating_inbound_streams: 128, } } } @@ -1222,6 +1235,14 @@ impl PoolConfig { self.substream_upgrade_protocol_override = Some(v); self } + + /// The maximum number of inbound streams concurrently negotiating on a connection. + /// + /// See [`super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams`]. + pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self { + self.max_negotiating_inbound_streams = v; + self + } } trait EntryExt<'a, K, V> { diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 461e8c7d13f..90b158dbf1e 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1364,6 +1364,14 @@ where self } + /// The maximum number of inbound streams concurrently negotiating on a connection. + /// + /// See [`PoolConfig::with_max_negotiating_inbound_streams`]. + pub fn max_negotiating_inbound_streams(mut self, v: usize) -> Self { + self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v); + self + } + /// Builds a `Swarm` with the current configuration. pub fn build(mut self) -> Swarm { let supported_protocols = self