From 1425886638541147d86acb7b44d1b547aafbf2a0 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 2 May 2022 11:38:50 +0200 Subject: [PATCH 1/7] swarm/src/connection: Limit # of negotiating inbound substreams This limit is shared across all `ConnectionHandler`s on a single connection. It only enforces a limit on the number of negotiating substreams. Once negotiated a `ConnectionHandler` manages the lifecycle of the substream and has to enforce limits themselves. --- swarm/src/connection/handler_wrapper.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index b10077d41b3..593475eb85e 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -35,6 +35,10 @@ use libp2p_core::{ }; use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration}; +/// The maximum number of incoming streams concurrently negotiated. Streams are +/// dropped and thus reset. +const MAX_NUM_NEGOTIATING_IN: usize = 128; + /// A wrapper for an underlying [`ConnectionHandler`]. /// /// It extends [`ConnectionHandler`] with: @@ -243,6 +247,11 @@ where ) { match endpoint { SubstreamEndpoint::Listener => { + if self.negotiating_in.len() == MAX_NUM_NEGOTIATING_IN { + log::warn!("Incoming substream exceeding `MAX_NUM_NEGOTIATING_IN`. Dropping.",); + return; + } + let protocol = self.handler.listen_protocol(); let timeout = *protocol.timeout(); let (upgrade, user_data) = protocol.into_upgrade(); From a98cf045b8644b7d56ccb0cef00ca929b96b2add Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 May 2022 13:35:43 +0200 Subject: [PATCH 2/7] swarm/src/: Make max number of negotiating inbound streams configurable --- swarm/src/connection.rs | 7 ++++++- swarm/src/connection/handler_wrapper.rs | 22 ++++++++++++++++------ swarm/src/connection/pool.rs | 21 +++++++++++++++++++++ swarm/src/lib.rs | 10 ++++++++++ 4 files changed, 53 insertions(+), 7 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 42c29239cdc..611fa68df59 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -97,8 +97,13 @@ where muxer: StreamMuxerBox, handler: THandler, substream_upgrade_protocol_override: Option, + max_number_negotiating_inbound_streams: usize, ) -> Self { - let wrapped_handler = HandlerWrapper::new(handler, substream_upgrade_protocol_override); + let wrapped_handler = HandlerWrapper::new( + handler, + substream_upgrade_protocol_override, + max_number_negotiating_inbound_streams, + ); Connection { muxing: Muxing::new(muxer), handler: wrapped_handler, diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index 593475eb85e..3b7a99ca48d 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -35,10 +35,6 @@ use libp2p_core::{ }; use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration}; -/// The maximum number of incoming streams concurrently negotiated. Streams are -/// dropped and thus reset. -const MAX_NUM_NEGOTIATING_IN: usize = 128; - /// A wrapper for an underlying [`ConnectionHandler`]. /// /// It extends [`ConnectionHandler`] with: @@ -81,6 +77,15 @@ where shutdown: Shutdown, /// The substream upgrade protocol override, if any. substream_upgrade_protocol_override: Option, + /// The maximum number of inbound streams concurrently negotiating. New inbound + /// streams exceeding the limit are dropped and thus reset. + /// + /// Note: This only enforces a limit on the number of concurrently + /// negotiating inbound streams. The total number of inbound streams on a + /// connection is the sum of negotiating and negotiated streams. A limit on + /// the total number of streams can be enforced at the [`StreamMuxerBox`] + /// level. + max_number_negotiating_inbound_streams: usize, } impl std::fmt::Debug for HandlerWrapper { @@ -102,6 +107,7 @@ impl HandlerWrapper { pub(crate) fn new( handler: TConnectionHandler, substream_upgrade_protocol_override: Option, + max_number_negotiating_inbound_streams: usize, ) -> Self { Self { handler, @@ -111,6 +117,7 @@ impl HandlerWrapper { unique_dial_upgrade_id: 0, shutdown: Shutdown::None, substream_upgrade_protocol_override, + max_number_negotiating_inbound_streams, } } @@ -247,8 +254,11 @@ where ) { match endpoint { SubstreamEndpoint::Listener => { - if self.negotiating_in.len() == MAX_NUM_NEGOTIATING_IN { - log::warn!("Incoming substream exceeding `MAX_NUM_NEGOTIATING_IN`. Dropping.",); + if self.negotiating_in.len() == self.max_number_negotiating_inbound_streams { + log::warn!( + "Incoming substream exceeding maximum number of \ + negotiating inbound streams. Dropping." + ); return; } diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 8186154c9b3..ad340398719 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -87,6 +87,11 @@ where /// The configured override for substream protocol upgrades, if any. substream_upgrade_protocol_override: Option, + /// The maximum number of inbound streams concurrently negotiating on a connection. + /// + /// See [super::handler_wrapper::HandlerWrapper::max_num_negotiating_inbound_streams]. + max_number_negotiating_inbound_streams: usize, + /// The executor to use for running the background tasks. If `None`, /// the tasks are kept in `local_spawns` instead and polled on the /// current thread when the [`Pool`] is polled for new events. @@ -263,6 +268,7 @@ where task_command_buffer_size: config.task_command_buffer_size, dial_concurrency_factor: config.dial_concurrency_factor, substream_upgrade_protocol_override: config.substream_upgrade_protocol_override, + max_number_negotiating_inbound_streams: config.max_number_negotiating_inbound_streams, executor: config.executor, local_spawns: FuturesUnordered::new(), pending_connection_events_tx, @@ -744,6 +750,7 @@ where muxer, handler.into_handler(&obtained_peer_id, &endpoint), self.substream_upgrade_protocol_override, + self.max_number_negotiating_inbound_streams, ); self.spawn( task::new_for_established_connection( @@ -1153,6 +1160,11 @@ pub struct PoolConfig { /// The configured override for substream protocol upgrades, if any. substream_upgrade_protocol_override: Option, + + /// The maximum number of inbound streams concurrently negotiating on a connection. + /// + /// See [super::handler_wrapper::HandlerWrapper::max_number_negotiating_inbound_streams]. + max_number_negotiating_inbound_streams: usize, } impl Default for PoolConfig { @@ -1164,6 +1176,7 @@ impl Default for PoolConfig { // By default, addresses of a single connection attempt are dialed in sequence. dial_concurrency_factor: NonZeroU8::new(1).expect("1 > 0"), substream_upgrade_protocol_override: None, + max_number_negotiating_inbound_streams: 128, } } } @@ -1222,6 +1235,14 @@ impl PoolConfig { self.substream_upgrade_protocol_override = Some(v); self } + + /// The maximum number of inbound streams concurrently negotiating on a connection. + /// + /// See [`super::handler_wrapper::HandlerWrapper::max_num_negotiating_inbound_streams`]. + pub fn with_max_number_negotiating_inbound_streams(mut self, v: usize) -> Self { + self.max_number_negotiating_inbound_streams = v; + self + } } trait EntryExt<'a, K, V> { diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 461e8c7d13f..c618fac93f3 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1364,6 +1364,16 @@ where self } + /// The maximum number of inbound streams concurrently negotiating on a connection. + /// + /// See [`PoolConfig::with_max_number_negotiating_inbound_streams`]. + pub fn max_number_negotiating_inbound_streams(mut self, v: usize) -> Self { + self.pool_config = self + .pool_config + .with_max_number_negotiating_inbound_streams(v); + self + } + /// Builds a `Swarm` with the current configuration. pub fn build(mut self) -> Swarm { let supported_protocols = self From 2c97c014ad0c425fe101ee574307b2c4d74e3784 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 7 Jun 2022 13:47:00 +0200 Subject: [PATCH 3/7] swarm/src/connection: Document that limit applies per connection --- swarm/src/connection/handler_wrapper.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index 3b7a99ca48d..979c1930d53 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -77,8 +77,9 @@ where shutdown: Shutdown, /// The substream upgrade protocol override, if any. substream_upgrade_protocol_override: Option, - /// The maximum number of inbound streams concurrently negotiating. New inbound - /// streams exceeding the limit are dropped and thus reset. + /// The maximum number of inbound streams concurrently negotiating on a + /// connection. New inbound streams exceeding the limit are dropped and thus + /// reset. /// /// Note: This only enforces a limit on the number of concurrently /// negotiating inbound streams. The total number of inbound streams on a From 8c08b3051d598e4b3fefa2801ab7cae081a386ed Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 7 Jun 2022 13:50:28 +0200 Subject: [PATCH 4/7] swarm/: Bump version and add changelog entry --- swarm/CHANGELOG.md | 4 ++++ swarm/Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index f44b84d52b8..c7ac99f31c8 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.36.1 - unreleased + +- Limit negotiating inbound substreams per connection. + # 0.36.0 - Don't require `Transport` to be `Clone`. See [PR 2529]. diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 446b18b111b..ddd8c82c85b 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-swarm" edition = "2021" rust-version = "1.56.1" description = "The libp2p swarm" -version = "0.36.0" +version = "0.36.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From 7a3fb1881944ad51eeb1b38d3db6956c4db3c29a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 7 Jun 2022 14:55:25 +0200 Subject: [PATCH 5/7] swarm/: Fix intra doc link --- swarm/src/connection/pool.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index ad340398719..db97334c934 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -89,7 +89,7 @@ where /// The maximum number of inbound streams concurrently negotiating on a connection. /// - /// See [super::handler_wrapper::HandlerWrapper::max_num_negotiating_inbound_streams]. + /// See [`super::handler_wrapper::HandlerWrapper::max_number_negotiating_inbound_streams`]. max_number_negotiating_inbound_streams: usize, /// The executor to use for running the background tasks. If `None`, @@ -1238,7 +1238,7 @@ impl PoolConfig { /// The maximum number of inbound streams concurrently negotiating on a connection. /// - /// See [`super::handler_wrapper::HandlerWrapper::max_num_negotiating_inbound_streams`]. + /// See [`super::handler_wrapper::HandlerWrapper::max_number_negotiating_inbound_streams`]. pub fn with_max_number_negotiating_inbound_streams(mut self, v: usize) -> Self { self.max_number_negotiating_inbound_streams = v; self From 2b187af6507f29a552e3dfd0eff026aee3fd3317 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 8 Jun 2022 09:30:10 +0200 Subject: [PATCH 6/7] swarm/: Rename to max_negotiating_inbound_streams --- swarm/src/connection.rs | 4 ++-- swarm/src/connection/handler_wrapper.rs | 8 ++++---- swarm/src/connection/pool.rs | 20 ++++++++++---------- swarm/src/lib.rs | 8 +++----- 4 files changed, 19 insertions(+), 21 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 611fa68df59..f55f0367b8c 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -97,12 +97,12 @@ where muxer: StreamMuxerBox, handler: THandler, substream_upgrade_protocol_override: Option, - max_number_negotiating_inbound_streams: usize, + max_negotiating_inbound_streams: usize, ) -> Self { let wrapped_handler = HandlerWrapper::new( handler, substream_upgrade_protocol_override, - max_number_negotiating_inbound_streams, + max_negotiating_inbound_streams, ); Connection { muxing: Muxing::new(muxer), diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index 979c1930d53..05e389d6134 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -86,7 +86,7 @@ where /// connection is the sum of negotiating and negotiated streams. A limit on /// the total number of streams can be enforced at the [`StreamMuxerBox`] /// level. - max_number_negotiating_inbound_streams: usize, + max_negotiating_inbound_streams: usize, } impl std::fmt::Debug for HandlerWrapper { @@ -108,7 +108,7 @@ impl HandlerWrapper { pub(crate) fn new( handler: TConnectionHandler, substream_upgrade_protocol_override: Option, - max_number_negotiating_inbound_streams: usize, + max_negotiating_inbound_streams: usize, ) -> Self { Self { handler, @@ -118,7 +118,7 @@ impl HandlerWrapper { unique_dial_upgrade_id: 0, shutdown: Shutdown::None, substream_upgrade_protocol_override, - max_number_negotiating_inbound_streams, + max_negotiating_inbound_streams, } } @@ -255,7 +255,7 @@ where ) { match endpoint { SubstreamEndpoint::Listener => { - if self.negotiating_in.len() == self.max_number_negotiating_inbound_streams { + if self.negotiating_in.len() == self.max_negotiating_inbound_streams { log::warn!( "Incoming substream exceeding maximum number of \ negotiating inbound streams. Dropping." diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index db97334c934..a8f28beeadc 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -89,8 +89,8 @@ where /// The maximum number of inbound streams concurrently negotiating on a connection. /// - /// See [`super::handler_wrapper::HandlerWrapper::max_number_negotiating_inbound_streams`]. - max_number_negotiating_inbound_streams: usize, + /// See [`super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams`]. + max_negotiating_inbound_streams: usize, /// The executor to use for running the background tasks. If `None`, /// the tasks are kept in `local_spawns` instead and polled on the @@ -268,7 +268,7 @@ where task_command_buffer_size: config.task_command_buffer_size, dial_concurrency_factor: config.dial_concurrency_factor, substream_upgrade_protocol_override: config.substream_upgrade_protocol_override, - max_number_negotiating_inbound_streams: config.max_number_negotiating_inbound_streams, + max_negotiating_inbound_streams: config.max_negotiating_inbound_streams, executor: config.executor, local_spawns: FuturesUnordered::new(), pending_connection_events_tx, @@ -750,7 +750,7 @@ where muxer, handler.into_handler(&obtained_peer_id, &endpoint), self.substream_upgrade_protocol_override, - self.max_number_negotiating_inbound_streams, + self.max_negotiating_inbound_streams, ); self.spawn( task::new_for_established_connection( @@ -1163,8 +1163,8 @@ pub struct PoolConfig { /// The maximum number of inbound streams concurrently negotiating on a connection. /// - /// See [super::handler_wrapper::HandlerWrapper::max_number_negotiating_inbound_streams]. - max_number_negotiating_inbound_streams: usize, + /// See [super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams]. + max_negotiating_inbound_streams: usize, } impl Default for PoolConfig { @@ -1176,7 +1176,7 @@ impl Default for PoolConfig { // By default, addresses of a single connection attempt are dialed in sequence. dial_concurrency_factor: NonZeroU8::new(1).expect("1 > 0"), substream_upgrade_protocol_override: None, - max_number_negotiating_inbound_streams: 128, + max_negotiating_inbound_streams: 128, } } } @@ -1238,9 +1238,9 @@ impl PoolConfig { /// The maximum number of inbound streams concurrently negotiating on a connection. /// - /// See [`super::handler_wrapper::HandlerWrapper::max_number_negotiating_inbound_streams`]. - pub fn with_max_number_negotiating_inbound_streams(mut self, v: usize) -> Self { - self.max_number_negotiating_inbound_streams = v; + /// See [`super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams`]. + pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self { + self.max_negotiating_inbound_streams = v; self } } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index c618fac93f3..90b158dbf1e 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1366,11 +1366,9 @@ where /// The maximum number of inbound streams concurrently negotiating on a connection. /// - /// See [`PoolConfig::with_max_number_negotiating_inbound_streams`]. - pub fn max_number_negotiating_inbound_streams(mut self, v: usize) -> Self { - self.pool_config = self - .pool_config - .with_max_number_negotiating_inbound_streams(v); + /// See [`PoolConfig::with_max_negotiating_inbound_streams`]. + pub fn max_negotiating_inbound_streams(mut self, v: usize) -> Self { + self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v); self } From baabefaed12044a2ec935e7f2d2b837ae5e8ac4d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 8 Jun 2022 11:25:30 +0200 Subject: [PATCH 7/7] Update swarm/CHANGELOG.md --- swarm/CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index c7ac99f31c8..6dcfdb7be92 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,6 +1,8 @@ # 0.36.1 - unreleased -- Limit negotiating inbound substreams per connection. +- Limit negotiating inbound substreams per connection. See [PR 2697]. + +[PR 2697]: https://github.com/libp2p/rust-libp2p/pull/2697 # 0.36.0