Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/identify: Allow at most one inbound identify push stream #2694

Merged
merged 2 commits into from Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions protocols/identify/CHANGELOG.md
@@ -1,3 +1,7 @@
# 0.36.1 - unreleased

- Allow at most one inbound identify push stream.

# 0.36.0

- Update to `libp2p-core` `v0.33.0`.
Expand Down
3 changes: 2 additions & 1 deletion protocols/identify/Cargo.toml
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-identify"
edition = "2021"
rust-version = "1.56.1"
description = "Nodes identifcation protocol for libp2p"
version = "0.36.0"
version = "0.36.1"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -22,6 +22,7 @@ prost-codec = { version = "0.1", path = "../../misc/prost-codec" }
prost = "0.10"
smallvec = "1.6.1"
thiserror = "1.0"
void = "1.0"

[dev-dependencies]
async-std = "1.6.2"
Expand Down
35 changes: 30 additions & 5 deletions protocols/identify/src/handler.rs
Expand Up @@ -22,6 +22,7 @@ use crate::protocol::{
IdentifyInfo, IdentifyProtocol, IdentifyPushProtocol, InboundPush, OutboundPush,
ReplySubstream, UpgradeError,
};
use futures::future::BoxFuture;
use futures::prelude::*;
use futures_timer::Delay;
use libp2p_core::either::{EitherError, EitherOutput};
Expand All @@ -30,6 +31,7 @@ use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
NegotiatedSubstream, SubstreamProtocol,
};
use log::warn;
use smallvec::SmallVec;
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};

Expand All @@ -39,6 +41,7 @@ use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};
/// at least one identification request to be answered by the remote before
/// permitting the underlying connection to be closed.
pub struct IdentifyHandler {
inbound_identify_push: Option<BoxFuture<'static, Result<IdentifyInfo, UpgradeError>>>,
/// Pending events to yield.
events: SmallVec<
[ConnectionHandlerEvent<
Expand Down Expand Up @@ -80,6 +83,7 @@ impl IdentifyHandler {
/// Creates a new `IdentifyHandler`.
pub fn new(initial_delay: Duration, interval: Duration) -> Self {
IdentifyHandler {
inbound_identify_push: Default::default(),
events: SmallVec::new(),
trigger_next_identify: Delay::new(initial_delay),
keep_alive: KeepAlive::Yes,
Expand Down Expand Up @@ -113,9 +117,14 @@ impl ConnectionHandler for IdentifyHandler {
EitherOutput::First(substream) => self.events.push(ConnectionHandlerEvent::Custom(
IdentifyHandlerEvent::Identify(substream),
)),
EitherOutput::Second(info) => self.events.push(ConnectionHandlerEvent::Custom(
IdentifyHandlerEvent::Identified(info),
)),
EitherOutput::Second(fut) => {
if self.inbound_identify_push.replace(fut).is_some() {
warn!(
"New inbound identify push stream while still upgrading previous one. \
Replacing previous with new.",
);
}
}
}
}

Expand Down Expand Up @@ -189,14 +198,30 @@ impl ConnectionHandler for IdentifyHandler {

// Poll the future that fires when we need to identify the node again.
match Future::poll(Pin::new(&mut self.trigger_next_identify), cx) {
Poll::Pending => Poll::Pending,
Poll::Pending => {}
Poll::Ready(()) => {
self.trigger_next_identify.reset(self.interval);
let ev = ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(EitherUpgrade::A(IdentifyProtocol), ()),
};
Poll::Ready(ev)
return Poll::Ready(ev);
}
}

if let Some(Poll::Ready(res)) = self
.inbound_identify_push
.as_mut()
.map(|f| f.poll_unpin(cx))
{
self.inbound_identify_push.take();

if let Ok(info) = res {
return Poll::Ready(ConnectionHandlerEvent::Custom(
IdentifyHandlerEvent::Identified(info),
));
}
}

Poll::Pending
}
}
12 changes: 7 additions & 5 deletions protocols/identify/src/protocol.rs
Expand Up @@ -20,7 +20,7 @@

use crate::structs_proto;
use asynchronous_codec::{FramedRead, FramedWrite};
use futures::prelude::*;
use futures::{future::BoxFuture, prelude::*};
use libp2p_core::{
identity, multiaddr,
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
Expand All @@ -30,6 +30,7 @@ use log::trace;
use std::convert::TryFrom;
use std::{fmt, io, iter, pin::Pin};
use thiserror::Error;
use void::Void;

const MAX_MESSAGE_SIZE_BYTES: usize = 4096;

Expand Down Expand Up @@ -143,12 +144,13 @@ impl<C> InboundUpgrade<C> for IdentifyPushProtocol<InboundPush>
where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = IdentifyInfo;
type Error = UpgradeError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type Output = BoxFuture<'static, Result<IdentifyInfo, UpgradeError>>;
type Error = Void;
type Future = future::Ready<Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
recv(socket).boxed()
// Lazily upgrade stream, thus allowing upgrade to happen within identify's handler.
future::ok(recv(socket).boxed())
}
}

Expand Down