From 676a630875403ba0cb02abc094114e003d122e70 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 7 Jun 2022 13:42:34 +0200 Subject: [PATCH] protocols/identify: Allow at most one inbound identify push stream (#2694) An identify push contains the whole identify information of a remote peer. Upgrading multiple inbound identify push streams is useless. Instead older streams are dropped in favor of newer streams. --- protocols/identify/CHANGELOG.md | 4 ++++ protocols/identify/Cargo.toml | 3 ++- protocols/identify/src/handler.rs | 35 +++++++++++++++++++++++++----- protocols/identify/src/protocol.rs | 12 +++++----- 4 files changed, 43 insertions(+), 11 deletions(-) diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index d48a7ff822c..b9766f1f6ae 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.36.1 - unreleased + +- Allow at most one inbound identify push stream. + # 0.36.0 - Update to `libp2p-core` `v0.33.0`. diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index d16a35b92e7..7c6bf842bec 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-identify" edition = "2021" rust-version = "1.56.1" description = "Nodes identifcation protocol for libp2p" -version = "0.36.0" +version = "0.36.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -22,6 +22,7 @@ prost-codec = { version = "0.1", path = "../../misc/prost-codec" } prost = "0.10" smallvec = "1.6.1" thiserror = "1.0" +void = "1.0" [dev-dependencies] async-std = "1.6.2" diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 748102df679..461e05a31e8 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -22,6 +22,7 @@ use crate::protocol::{ IdentifyInfo, IdentifyProtocol, IdentifyPushProtocol, InboundPush, OutboundPush, ReplySubstream, UpgradeError, }; +use futures::future::BoxFuture; use futures::prelude::*; use futures_timer::Delay; use libp2p_core::either::{EitherError, EitherOutput}; @@ -30,6 +31,7 @@ use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, NegotiatedSubstream, SubstreamProtocol, }; +use log::warn; use smallvec::SmallVec; use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; @@ -39,6 +41,7 @@ use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; /// at least one identification request to be answered by the remote before /// permitting the underlying connection to be closed. pub struct IdentifyHandler { + inbound_identify_push: Option>>, /// Pending events to yield. events: SmallVec< [ConnectionHandlerEvent< @@ -80,6 +83,7 @@ impl IdentifyHandler { /// Creates a new `IdentifyHandler`. pub fn new(initial_delay: Duration, interval: Duration) -> Self { IdentifyHandler { + inbound_identify_push: Default::default(), events: SmallVec::new(), trigger_next_identify: Delay::new(initial_delay), keep_alive: KeepAlive::Yes, @@ -113,9 +117,14 @@ impl ConnectionHandler for IdentifyHandler { EitherOutput::First(substream) => self.events.push(ConnectionHandlerEvent::Custom( IdentifyHandlerEvent::Identify(substream), )), - EitherOutput::Second(info) => self.events.push(ConnectionHandlerEvent::Custom( - IdentifyHandlerEvent::Identified(info), - )), + EitherOutput::Second(fut) => { + if self.inbound_identify_push.replace(fut).is_some() { + warn!( + "New inbound identify push stream while still upgrading previous one. \ + Replacing previous with new.", + ); + } + } } } @@ -189,14 +198,30 @@ impl ConnectionHandler for IdentifyHandler { // Poll the future that fires when we need to identify the node again. match Future::poll(Pin::new(&mut self.trigger_next_identify), cx) { - Poll::Pending => Poll::Pending, + Poll::Pending => {} Poll::Ready(()) => { self.trigger_next_identify.reset(self.interval); let ev = ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(EitherUpgrade::A(IdentifyProtocol), ()), }; - Poll::Ready(ev) + return Poll::Ready(ev); + } + } + + if let Some(Poll::Ready(res)) = self + .inbound_identify_push + .as_mut() + .map(|f| f.poll_unpin(cx)) + { + self.inbound_identify_push.take(); + + if let Ok(info) = res { + return Poll::Ready(ConnectionHandlerEvent::Custom( + IdentifyHandlerEvent::Identified(info), + )); } } + + Poll::Pending } } diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 439412d5ca2..0fd50460e2d 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -20,7 +20,7 @@ use crate::structs_proto; use asynchronous_codec::{FramedRead, FramedWrite}; -use futures::prelude::*; +use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{ identity, multiaddr, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, @@ -30,6 +30,7 @@ use log::trace; use std::convert::TryFrom; use std::{fmt, io, iter, pin::Pin}; use thiserror::Error; +use void::Void; const MAX_MESSAGE_SIZE_BYTES: usize = 4096; @@ -143,12 +144,13 @@ impl InboundUpgrade for IdentifyPushProtocol where C: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - type Output = IdentifyInfo; - type Error = UpgradeError; - type Future = Pin> + Send>>; + type Output = BoxFuture<'static, Result>; + type Error = Void; + type Future = future::Ready>; fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { - recv(socket).boxed() + // Lazily upgrade stream, thus allowing upgrade to happen within identify's handler. + future::ok(recv(socket).boxed()) } }