diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index 2f61b5fe7a6..bded75920f8 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.3.1 - unreleased + +- Upgrade at most one inbound connect request. + # 0.3.0 - Update to `libp2p-core` `v0.33.0`. diff --git a/protocols/dcutr/Cargo.toml b/protocols/dcutr/Cargo.toml index 2dfc08e497f..f3e813e2f8e 100644 --- a/protocols/dcutr/Cargo.toml +++ b/protocols/dcutr/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-dcutr" edition = "2021" rust-version = "1.56.1" description = "Direct connection upgrade through relay" -version = "0.3.0" +version = "0.3.1" authors = ["Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index dfa8eede63b..9f9e2e01c13 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -22,7 +22,6 @@ use crate::protocol; use futures::future::{BoxFuture, FutureExt}; -use futures::stream::{FuturesUnordered, StreamExt}; use instant::Instant; use libp2p_core::either::{EitherError, EitherOutput}; use libp2p_core::multiaddr::Multiaddr; @@ -144,10 +143,9 @@ pub struct Handler { ::Error, >, >, - /// Inbound connects, accepted by the behaviour, pending completion. - inbound_connects: FuturesUnordered< - BoxFuture<'static, Result, protocol::inbound::UpgradeError>>, - >, + /// Inbound connect, accepted by the behaviour, pending completion. + inbound_connect: + Option, protocol::inbound::UpgradeError>>>, keep_alive: KeepAlive, } @@ -157,7 +155,7 @@ impl Handler { endpoint, pending_error: Default::default(), queued_events: Default::default(), - inbound_connects: Default::default(), + inbound_connect: Default::default(), keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(30)), } } @@ -247,8 +245,15 @@ impl ConnectionHandler for Handler { inbound_connect, obs_addrs, } => { - self.inbound_connects - .push(inbound_connect.accept(obs_addrs).boxed()); + if let Some(_) = self + .inbound_connect + .replace(inbound_connect.accept(obs_addrs).boxed()) + { + log::warn!( + "New inbound connect stream while still upgrading previous one. \ + Replacing previous with new.", + ); + } } Command::UpgradeFinishedDontKeepAlive => { self.keep_alive = KeepAlive::No; @@ -364,7 +369,8 @@ impl ConnectionHandler for Handler { return Poll::Ready(event); } - while let Poll::Ready(Some(result)) = self.inbound_connects.poll_next_unpin(cx) { + if let Some(Poll::Ready(result)) = self.inbound_connect.as_mut().map(|f| f.poll_unpin(cx)) { + self.inbound_connect = None; match result { Ok(addresses) => { return Poll::Ready(ConnectionHandlerEvent::Custom(