Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/src/connection: Test max_negotiating_inbound_streams #2785

Merged
merged 8 commits into from Aug 16, 2022
2 changes: 2 additions & 0 deletions core/src/upgrade.rs
Expand Up @@ -64,6 +64,7 @@ mod error;
mod from_fn;
mod map;
mod optional;
mod pending;
mod select;
mod transfer;

Expand All @@ -77,6 +78,7 @@ pub use self::{
from_fn::{from_fn, FromFnUpgrade},
map::{MapInboundUpgrade, MapInboundUpgradeErr, MapOutboundUpgrade, MapOutboundUpgradeErr},
optional::OptionalUpgrade,
pending::PendingUpgrade,
select::SelectUpgrade,
transfer::{read_length_prefixed, read_varint, write_length_prefixed, write_varint},
};
Expand Down
76 changes: 76 additions & 0 deletions core/src/upgrade/pending.rs
@@ -0,0 +1,76 @@
// Copyright 2022 Protocol Labs.
// Copyright 2017-2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo};
use futures::future;
use std::iter;
use void::Void;

/// Implementation of [`UpgradeInfo`], [`InboundUpgrade`] and [`OutboundUpgrade`] that always
/// returns a pending upgrade.
#[derive(Debug, Copy, Clone)]
pub struct PendingUpgrade<P> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this to be generic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting to use a String instead? What if a user wants to use it with a type other than String that implements ProtocolName?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With "user" I mean am thinking of potential users beyond the test introduced in this pull request.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting to use a String instead? What if a user wants to use it with a type other than String that implements ProtocolName?

&'static str would probably be my preference and I'd say we can make it generic once needed? For tests, a static protocol name is probably good enough for many if not all cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

libp2p-core has the ProtocolName abstraction. Thus I would suggest all of libp2p-core should use that abstraction. I am happy to design an alternative approach. Though I think that should happen in a different pull request.

protocol_name: P,
}

impl<P> PendingUpgrade<P> {
pub fn new(protocol_name: P) -> Self {
Self { protocol_name }
}
}

impl<P> UpgradeInfo for PendingUpgrade<P>
where
P: ProtocolName + Clone,
{
type Info = P;
type InfoIter = iter::Once<P>;

fn protocol_info(&self) -> Self::InfoIter {
iter::once(self.protocol_name.clone())
}
}

impl<C, P> InboundUpgrade<C> for PendingUpgrade<P>
where
P: ProtocolName + Clone,
{
type Output = Void;
type Error = Void;
type Future = future::Pending<Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, _: C, _: Self::Info) -> Self::Future {
future::pending()
}
}

impl<C, P> OutboundUpgrade<C> for PendingUpgrade<P>
where
P: ProtocolName + Clone,
{
type Output = Void;
type Error = Void;
type Future = future::Pending<Result<Self::Output, Self::Error>>;

fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future {
future::pending()
}
}
79 changes: 79 additions & 0 deletions swarm/src/connection/handler_wrapper.rs
Expand Up @@ -440,3 +440,82 @@ pub enum Event<TOutboundOpenInfo, TCustom> {
/// Other event.
Custom(TCustom),
}

#[cfg(test)]
mod tests {
use super::*;
use crate::handler::PendingConnectionHandler;
use quickcheck::*;
use std::sync::Arc;

#[test]
fn max_negotiating_inbound_streams() {
fn prop(max_negotiating_inbound_streams: u8) {
let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into();
let mut wrapper = HandlerWrapper::new(
PeerId::random(),
ConnectedPoint::Listener {
local_addr: Multiaddr::empty(),
send_back_addr: Multiaddr::empty(),
},
PendingConnectionHandler::new("test".to_string()),
None,
max_negotiating_inbound_streams,
);
let alive_substreams_counter = Arc::new(());

for _ in 0..max_negotiating_inbound_streams {
let substream =
SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone()));
wrapper.inject_substream(substream, SubstreamEndpoint::Listener);
}

assert_eq!(
Arc::strong_count(&alive_substreams_counter),
max_negotiating_inbound_streams + 1,
"Expect none of the substreams up to the limit to be dropped."
);

let substream = SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone()));
wrapper.inject_substream(substream, SubstreamEndpoint::Listener);

assert_eq!(
Arc::strong_count(&alive_substreams_counter),
max_negotiating_inbound_streams + 1,
"Expect substream exceeding the limit to be dropped."
);
}

QuickCheck::new().quickcheck(prop as fn(_));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you ever considered using the #[quickcheck] macro?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't. Looks handy to me. Though I would prefer being consistent, i.e. move the entire code base to #[quickcheck] at once instead of diverging on a couple of pull requests.

}

struct PendingSubstream(Arc<()>);

impl AsyncRead for PendingSubstream {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
Poll::Pending
}
}

impl AsyncWrite for PendingSubstream {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Poll::Pending
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Pending
}

fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Pending
}
}
}
2 changes: 2 additions & 0 deletions swarm/src/handler.rs
Expand Up @@ -44,6 +44,7 @@ mod map_in;
mod map_out;
pub mod multi;
mod one_shot;
mod pending;
mod select;

pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend};
Expand All @@ -56,6 +57,7 @@ pub use dummy::DummyConnectionHandler;
pub use map_in::MapInEvent;
pub use map_out::MapOutEvent;
pub use one_shot::{OneShotHandler, OneShotHandlerConfig};
pub use pending::PendingConnectionHandler;
pub use select::{ConnectionHandlerSelect, IntoConnectionHandlerSelect};

/// A handler for a set of protocols used on a connection with a remote.
Expand Down
120 changes: 120 additions & 0 deletions swarm/src/handler/pending.rs
@@ -0,0 +1,120 @@
// Copyright 2022 Protocol Labs.
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::handler::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
SubstreamProtocol,
};
use crate::NegotiatedSubstream;
use libp2p_core::{
upgrade::{InboundUpgrade, OutboundUpgrade, PendingUpgrade},
Multiaddr,
};
use std::task::{Context, Poll};
use void::Void;

/// Implementation of [`ConnectionHandler`] that returns a pending upgrade.
#[derive(Clone, Debug)]
pub struct PendingConnectionHandler {
protocol_name: String,
}

impl PendingConnectionHandler {
pub fn new(protocol_name: String) -> Self {
PendingConnectionHandler { protocol_name }
}
}

impl ConnectionHandler for PendingConnectionHandler {
type InEvent = Void;
type OutEvent = Void;
type Error = Void;
type InboundProtocol = PendingUpgrade<String>;
type OutboundProtocol = PendingUpgrade<String>;
type OutboundOpenInfo = Void;
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(PendingUpgrade::new(self.protocol_name.clone()), ())
}

fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::InboundOpenInfo,
) {
void::unreachable(protocol)
}

fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::OutboundOpenInfo,
) {
void::unreachable(protocol);
#[allow(unreachable_code)]
{
void::unreachable(_info);
}
Comment on lines +70 to +76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_info: Self::OutboundOpenInfo,
) {
void::unreachable(protocol);
#[allow(unreachable_code)]
{
void::unreachable(_info);
}
_: Self::OutboundOpenInfo,
) {
void::unreachable(protocol);

I'd almost suggest to not use void::unreachable for the info part if we have to use #[allow(unreachable_code)] for it to work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the additional void::unreachable, just in case we refactor this part of the codebase in the future.

}

fn inject_event(&mut self, v: Self::InEvent) {
void::unreachable(v)
}

fn inject_address_change(&mut self, _: &Multiaddr) {}

fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
_: ConnectionHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
>,
) {
}

fn inject_listen_upgrade_error(
&mut self,
_: Self::InboundOpenInfo,
_: ConnectionHandlerUpgrErr<
<Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Error,
>,
) {
}

fn connection_keep_alive(&self) -> KeepAlive {
KeepAlive::No
}

fn poll(
&mut self,
_: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
Poll::Pending
}
}