From ed02a10cf28832ae1f836ec5e469ea9958287e66 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sun, 22 Oct 2023 08:11:28 +1100 Subject: [PATCH] refactor: migrate upgrades to `{In,Out}boundConnectionUpgrade` This needs to be done in one PR because we need to remove the blanket impl at the same time we add the new ones. I also chose to duplicate `SelectUpgrade` for the `libp2p::SwarmBuilder` because the `SelectUpgrade` within `libp2p-core` is really only meant to be used for protocol upgrade (which will go away at some point). This tiny bit of code duplication doesn't hurt and will help us in the future. Resolves: #4521. Pull-Request: #4695. --- core/CHANGELOG.md | 2 + core/src/upgrade.rs | 28 ------ core/tests/transport_upgrade.rs | 8 +- libp2p/src/builder.rs | 1 + libp2p/src/builder/phase.rs | 7 +- libp2p/src/builder/phase/other_transport.rs | 15 +-- libp2p/src/builder/phase/quic.rs | 25 ++--- libp2p/src/builder/phase/relay.rs | 13 +-- libp2p/src/builder/phase/tcp.rs | 28 +++--- libp2p/src/builder/phase/websocket.rs | 25 ++--- libp2p/src/builder/select_muxer.rs | 96 +++++++++++++++++++ libp2p/src/builder/select_security.rs | 14 +-- misc/webrtc-utils/src/noise.rs | 3 +- muxers/mplex/CHANGELOG.md | 2 + muxers/mplex/src/lib.rs | 6 +- muxers/test-harness/src/lib.rs | 11 ++- muxers/yamux/CHANGELOG.md | 2 + muxers/yamux/src/lib.rs | 6 +- transports/noise/CHANGELOG.md | 2 + transports/noise/src/lib.rs | 7 +- transports/noise/tests/smoke.rs | 3 +- .../noise/tests/webtransport_certhashes.rs | 2 +- transports/plaintext/CHANGELOG.md | 2 + transports/plaintext/src/lib.rs | 7 +- transports/plaintext/tests/smoke.rs | 2 +- transports/tls/CHANGELOG.md | 2 + transports/tls/src/upgrade.rs | 7 +- .../webtransport-websys/src/connection.rs | 3 +- 28 files changed, 213 insertions(+), 116 deletions(-) create mode 100644 libp2p/src/builder/select_muxer.rs diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 76cffdfcedd..007e00e7710 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,5 +1,7 @@ ## 0.41.0 - unreleased +- Remove blanket-impl of `{In,Out}boundUpgrade` for `{In,Out}boundConnectionUpgrade`. + See [PR 4695](https://github.com/libp2p/rust-libp2p/pull/4695). ## 0.40.1 diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index 7db1853b56c..777443822b7 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -157,31 +157,3 @@ pub trait OutboundConnectionUpgrade: UpgradeInfo { /// The `info` is the identifier of the protocol, as produced by `protocol_info`. fn upgrade_outbound(self, socket: T, info: Self::Info) -> Self::Future; } - -// Blanket implementation for InboundConnectionUpgrade based on InboundUpgrade for backwards compatibility -impl InboundConnectionUpgrade for U -where - U: InboundUpgrade, -{ - type Output = >::Output; - type Error = >::Error; - type Future = >::Future; - - fn upgrade_inbound(self, socket: T, info: Self::Info) -> Self::Future { - self.upgrade_inbound(socket, info) - } -} - -// Blanket implementation for OutboundConnectionUpgrade based on OutboundUpgrade for backwards compatibility -impl OutboundConnectionUpgrade for U -where - U: OutboundUpgrade, -{ - type Output = >::Output; - type Error = >::Error; - type Future = >::Future; - - fn upgrade_outbound(self, socket: T, info: Self::Info) -> Self::Future { - self.upgrade_outbound(socket, info) - } -} diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs index 193ee73cbc8..8f151c886c7 100644 --- a/core/tests/transport_upgrade.rs +++ b/core/tests/transport_upgrade.rs @@ -20,7 +20,9 @@ use futures::prelude::*; use libp2p_core::transport::{ListenerId, MemoryTransport, Transport}; -use libp2p_core::upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{ + self, InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeInfo, +}; use libp2p_identity as identity; use libp2p_mplex::MplexConfig; use libp2p_noise as noise; @@ -40,7 +42,7 @@ impl UpgradeInfo for HelloUpgrade { } } -impl InboundUpgrade for HelloUpgrade +impl InboundConnectionUpgrade for HelloUpgrade where C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { @@ -58,7 +60,7 @@ where } } -impl OutboundUpgrade for HelloUpgrade +impl OutboundConnectionUpgrade for HelloUpgrade where C: AsyncWrite + AsyncRead + Send + Unpin + 'static, { diff --git a/libp2p/src/builder.rs b/libp2p/src/builder.rs index 0dbeaa7e2ee..a51a269b133 100644 --- a/libp2p/src/builder.rs +++ b/libp2p/src/builder.rs @@ -1,6 +1,7 @@ use std::marker::PhantomData; mod phase; +mod select_muxer; mod select_security; /// Build a [`Swarm`](libp2p_swarm::Swarm) by combining an identity, a set of diff --git a/libp2p/src/builder/phase.rs b/libp2p/src/builder/phase.rs index dbf9eb883ae..4871adf65ca 100644 --- a/libp2p/src/builder/phase.rs +++ b/libp2p/src/builder/phase.rs @@ -25,10 +25,11 @@ use swarm::*; use tcp::*; use websocket::*; +use super::select_muxer::SelectMuxerUpgrade; use super::select_security::SelectSecurityUpgrade; use super::SwarmBuilder; -use libp2p_core::{muxing::StreamMuxerBox, upgrade::SelectUpgrade, Transport}; +use libp2p_core::{muxing::StreamMuxerBox, Transport}; use libp2p_identity::Keypair; pub trait IntoSecurityUpgrade { @@ -94,7 +95,7 @@ where U1: IntoMultiplexerUpgrade, U2: IntoMultiplexerUpgrade, { - type Upgrade = SelectUpgrade; + type Upgrade = SelectMuxerUpgrade; fn into_multiplexer_upgrade(self) -> Self::Upgrade { let (f1, f2) = self; @@ -102,7 +103,7 @@ where let u1 = f1.into_multiplexer_upgrade(); let u2 = f2.into_multiplexer_upgrade(); - SelectUpgrade::new(u1, u2) + SelectMuxerUpgrade::new(u1, u2) } } diff --git a/libp2p/src/builder/phase/other_transport.rs b/libp2p/src/builder/phase/other_transport.rs index 1453d2d097b..946b696323c 100644 --- a/libp2p/src/builder/phase/other_transport.rs +++ b/libp2p/src/builder/phase/other_transport.rs @@ -2,9 +2,10 @@ use std::convert::Infallible; use std::marker::PhantomData; use std::sync::Arc; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; use libp2p_core::Transport; #[cfg(feature = "relay")] -use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::{Negotiated, UpgradeInfo}; #[cfg(feature = "relay")] use libp2p_identity::PeerId; @@ -119,9 +120,9 @@ impl SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static, SecError: std::error::Error + Send + Sync + 'static, SecUpgrade: IntoSecurityUpgrade, - SecUpgrade::Upgrade: InboundUpgrade, Output = (PeerId, SecStream), Error = SecError> + OutboundUpgrade, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + SecUpgrade::Upgrade: InboundConnectionUpgrade, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, @@ -129,9 +130,9 @@ impl MuxStream::Substream: Send + 'static, MuxStream::Error: Send + Sync + 'static, MuxUpgrade: IntoMultiplexerUpgrade, - MuxUpgrade::Upgrade: InboundUpgrade, Output = MuxStream, Error = MuxError> + OutboundUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + MuxUpgrade::Upgrade: InboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, MuxError: std::error::Error + Send + Sync + 'static, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, diff --git a/libp2p/src/builder/phase/quic.rs b/libp2p/src/builder/phase/quic.rs index 47eec66b8d2..ae8d9400c25 100644 --- a/libp2p/src/builder/phase/quic.rs +++ b/libp2p/src/builder/phase/quic.rs @@ -2,6 +2,7 @@ use super::*; use crate::SwarmBuilder; #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))] use libp2p_core::muxing::StreamMuxer; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; #[cfg(any( feature = "relay", all(not(target_arch = "wasm32"), feature = "websocket") @@ -90,9 +91,9 @@ impl SwarmBuilder, - SecUpgrade::Upgrade: InboundUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + SecUpgrade::Upgrade: InboundConnectionUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, @@ -100,9 +101,9 @@ impl SwarmBuilder, - MuxUpgrade::Upgrade: InboundUpgrade, Output = MuxStream, Error = MuxError> + OutboundUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + MuxUpgrade::Upgrade: InboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, MuxError: std::error::Error + Send + Sync + 'static, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, @@ -207,9 +208,9 @@ macro_rules! impl_quic_phase_with_websocket { SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static, SecError: std::error::Error + Send + Sync + 'static, SecUpgrade: IntoSecurityUpgrade<$websocketStream>, - SecUpgrade::Upgrade: InboundUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + SecUpgrade::Upgrade: InboundConnectionUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, @@ -217,9 +218,9 @@ macro_rules! impl_quic_phase_with_websocket { MuxStream::Substream: Send + 'static, MuxStream::Error: Send + Sync + 'static, MuxUpgrade: IntoMultiplexerUpgrade, - MuxUpgrade::Upgrade: InboundUpgrade, Output = MuxStream, Error = MuxError> + OutboundUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + MuxUpgrade::Upgrade: InboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, MuxError: std::error::Error + Send + Sync + 'static, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, diff --git a/libp2p/src/builder/phase/relay.rs b/libp2p/src/builder/phase/relay.rs index 3062e828e07..2d47810ca9e 100644 --- a/libp2p/src/builder/phase/relay.rs +++ b/libp2p/src/builder/phase/relay.rs @@ -2,6 +2,7 @@ use std::marker::PhantomData; #[cfg(feature = "relay")] use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; #[cfg(feature = "relay")] use libp2p_core::Transport; #[cfg(any(feature = "relay", feature = "websocket"))] @@ -59,9 +60,9 @@ impl SwarmBuilder, - SecUpgrade::Upgrade: InboundUpgrade, Output = (PeerId, SecStream), Error = SecError> + OutboundUpgrade, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + SecUpgrade::Upgrade: InboundConnectionUpgrade, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, @@ -69,9 +70,9 @@ impl SwarmBuilder, - MuxUpgrade::Upgrade: InboundUpgrade, Output = MuxStream, Error = MuxError> + OutboundUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + MuxUpgrade::Upgrade: InboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, MuxError: std::error::Error + Send + Sync + 'static, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, diff --git a/libp2p/src/builder/phase/tcp.rs b/libp2p/src/builder/phase/tcp.rs index aee786c869d..5db7315d472 100644 --- a/libp2p/src/builder/phase/tcp.rs +++ b/libp2p/src/builder/phase/tcp.rs @@ -11,7 +11,9 @@ use libp2p_core::Transport; not(target_arch = "wasm32"), any(feature = "tcp", feature = "websocket") ))] -use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::{ + upgrade::InboundConnectionUpgrade, upgrade::OutboundConnectionUpgrade, Negotiated, UpgradeInfo, +}; use std::marker::PhantomData; pub struct TcpPhase {} @@ -58,9 +60,9 @@ macro_rules! impl_tcp_builder { SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static, SecError: std::error::Error + Send + Sync + 'static, SecUpgrade: IntoSecurityUpgrade, - SecUpgrade::Upgrade: InboundUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + SecUpgrade::Upgrade: InboundConnectionUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, @@ -68,9 +70,9 @@ macro_rules! impl_tcp_builder { MuxStream::Substream: Send + 'static, MuxStream::Error: Send + Sync + 'static, MuxUpgrade: IntoMultiplexerUpgrade, - MuxUpgrade::Upgrade: InboundUpgrade, Output = MuxStream, Error = MuxError> + OutboundUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + MuxUpgrade::Upgrade: InboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, MuxError: std::error::Error + Send + Sync + 'static, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, @@ -184,9 +186,9 @@ macro_rules! impl_tcp_phase_with_websocket { SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static, SecError: std::error::Error + Send + Sync + 'static, SecUpgrade: IntoSecurityUpgrade<$websocketStream>, - SecUpgrade::Upgrade: InboundUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + SecUpgrade::Upgrade: InboundConnectionUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, @@ -194,9 +196,9 @@ macro_rules! impl_tcp_phase_with_websocket { MuxStream::Substream: Send + 'static, MuxStream::Error: Send + Sync + 'static, MuxUpgrade: IntoMultiplexerUpgrade, - MuxUpgrade::Upgrade: InboundUpgrade, Output = MuxStream, Error = MuxError> + OutboundUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + MuxUpgrade::Upgrade: InboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, MuxError: std::error::Error + Send + Sync + 'static, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, diff --git a/libp2p/src/builder/phase/websocket.rs b/libp2p/src/builder/phase/websocket.rs index 0415b3a3b4d..aeb6236a026 100644 --- a/libp2p/src/builder/phase/websocket.rs +++ b/libp2p/src/builder/phase/websocket.rs @@ -2,6 +2,7 @@ use super::*; use crate::SwarmBuilder; #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))] use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox}; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))] use libp2p_core::Transport; #[cfg(any( @@ -70,9 +71,9 @@ macro_rules! impl_websocket_builder { SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static, SecError: std::error::Error + Send + Sync + 'static, SecUpgrade: IntoSecurityUpgrade<$websocketStream>, - SecUpgrade::Upgrade: InboundUpgrade, Output = (PeerId, SecStream), Error = SecError> + OutboundUpgrade, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + SecUpgrade::Upgrade: InboundConnectionUpgrade, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, @@ -80,9 +81,9 @@ macro_rules! impl_websocket_builder { MuxStream::Substream: Send + 'static, MuxStream::Error: Send + Sync + 'static, MuxUpgrade: IntoMultiplexerUpgrade, - MuxUpgrade::Upgrade: InboundUpgrade, Output = MuxStream, Error = MuxError> + OutboundUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + MuxUpgrade::Upgrade: InboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, MuxError: std::error::Error + Send + Sync + 'static, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, @@ -164,9 +165,9 @@ impl SwarmBuilder, - SecUpgrade::Upgrade: InboundUpgrade, Output = (PeerId, SecStream), Error = SecError> + OutboundUpgrade, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + SecUpgrade::Upgrade: InboundConnectionUpgrade, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, @@ -174,9 +175,9 @@ impl SwarmBuilder, - MuxUpgrade::Upgrade: InboundUpgrade, Output = MuxStream, Error = MuxError> + OutboundUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, - >>::Future: Send, - >>::Future: Send, + MuxUpgrade::Upgrade: InboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade, Output = MuxStream, Error = MuxError> + Clone + Send + 'static, + >>::Future: Send, + >>::Future: Send, MuxError: std::error::Error + Send + Sync + 'static, <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, <>::Upgrade as UpgradeInfo>::Info: Send, diff --git a/libp2p/src/builder/select_muxer.rs b/libp2p/src/builder/select_muxer.rs new file mode 100644 index 00000000000..5a2bdbf0864 --- /dev/null +++ b/libp2p/src/builder/select_muxer.rs @@ -0,0 +1,96 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use either::Either; +use futures::future; +use libp2p_core::either::EitherFuture; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; +use libp2p_core::UpgradeInfo; +use std::iter::{Chain, Map}; + +#[derive(Debug, Clone)] +pub struct SelectMuxerUpgrade(A, B); + +impl SelectMuxerUpgrade { + pub fn new(a: A, b: B) -> Self { + SelectMuxerUpgrade(a, b) + } +} + +impl UpgradeInfo for SelectMuxerUpgrade +where + A: UpgradeInfo, + B: UpgradeInfo, +{ + type Info = Either; + type InfoIter = Chain< + Map<::IntoIter, fn(A::Info) -> Self::Info>, + Map<::IntoIter, fn(B::Info) -> Self::Info>, + >; + + fn protocol_info(&self) -> Self::InfoIter { + let a = self + .0 + .protocol_info() + .into_iter() + .map(Either::Left as fn(A::Info) -> _); + let b = self + .1 + .protocol_info() + .into_iter() + .map(Either::Right as fn(B::Info) -> _); + + a.chain(b) + } +} + +impl InboundConnectionUpgrade for SelectMuxerUpgrade +where + A: InboundConnectionUpgrade, + B: InboundConnectionUpgrade, +{ + type Output = future::Either; + type Error = Either; + type Future = EitherFuture; + + fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { + match info { + Either::Left(info) => EitherFuture::First(self.0.upgrade_inbound(sock, info)), + Either::Right(info) => EitherFuture::Second(self.1.upgrade_inbound(sock, info)), + } + } +} + +impl OutboundConnectionUpgrade for SelectMuxerUpgrade +where + A: OutboundConnectionUpgrade, + B: OutboundConnectionUpgrade, +{ + type Output = future::Either; + type Error = Either; + type Future = EitherFuture; + + fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + match info { + Either::Left(info) => EitherFuture::First(self.0.upgrade_outbound(sock, info)), + Either::Right(info) => EitherFuture::Second(self.1.upgrade_outbound(sock, info)), + } + } +} diff --git a/libp2p/src/builder/select_security.rs b/libp2p/src/builder/select_security.rs index 91dbae869c6..4bae959b28b 100644 --- a/libp2p/src/builder/select_security.rs +++ b/libp2p/src/builder/select_security.rs @@ -23,7 +23,7 @@ use either::Either; use futures::future::MapOk; use futures::{future, TryFutureExt}; use libp2p_core::either::EitherFuture; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeInfo}; use libp2p_identity::PeerId; use std::iter::{Chain, Map}; @@ -70,10 +70,10 @@ where } } -impl InboundUpgrade for SelectSecurityUpgrade +impl InboundConnectionUpgrade for SelectSecurityUpgrade where - A: InboundUpgrade, - B: InboundUpgrade, + A: InboundConnectionUpgrade, + B: InboundConnectionUpgrade, { type Output = (PeerId, future::Either); type Error = Either; @@ -91,10 +91,10 @@ where } } -impl OutboundUpgrade for SelectSecurityUpgrade +impl OutboundConnectionUpgrade for SelectSecurityUpgrade where - A: OutboundUpgrade, - B: OutboundUpgrade, + A: OutboundConnectionUpgrade, + B: OutboundConnectionUpgrade, { type Output = (PeerId, future::Either); type Error = Either; diff --git a/misc/webrtc-utils/src/noise.rs b/misc/webrtc-utils/src/noise.rs index 023766bc1df..ac2e58c9163 100644 --- a/misc/webrtc-utils/src/noise.rs +++ b/misc/webrtc-utils/src/noise.rs @@ -19,7 +19,8 @@ // DEALINGS IN THE SOFTWARE. use futures::{AsyncRead, AsyncWrite, AsyncWriteExt}; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; +use libp2p_core::UpgradeInfo; use libp2p_identity as identity; use libp2p_identity::PeerId; use libp2p_noise as noise; diff --git a/muxers/mplex/CHANGELOG.md b/muxers/mplex/CHANGELOG.md index 8cee61ced6e..1e50042e08a 100644 --- a/muxers/mplex/CHANGELOG.md +++ b/muxers/mplex/CHANGELOG.md @@ -1,5 +1,7 @@ ## 0.41.0 - unreleased +- Migrate to `{In,Out}boundConnectionUpgrade` traits. + See [PR 4695](https://github.com/libp2p/rust-libp2p/pull/4695). ## 0.40.0 diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 81c5147af69..c67e0e3baec 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -32,7 +32,7 @@ use bytes::Bytes; use codec::LocalStreamId; use futures::{future, prelude::*, ready}; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeInfo}; use parking_lot::Mutex; use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll}; @@ -45,7 +45,7 @@ impl UpgradeInfo for MplexConfig { } } -impl InboundUpgrade for MplexConfig +impl InboundConnectionUpgrade for MplexConfig where C: AsyncRead + AsyncWrite + Unpin, { @@ -61,7 +61,7 @@ where } } -impl OutboundUpgrade for MplexConfig +impl OutboundConnectionUpgrade for MplexConfig where C: AsyncRead + AsyncWrite + Unpin, { diff --git a/muxers/test-harness/src/lib.rs b/muxers/test-harness/src/lib.rs index 544e057c108..233fe3a478c 100644 --- a/muxers/test-harness/src/lib.rs +++ b/muxers/test-harness/src/lib.rs @@ -3,7 +3,8 @@ use futures::{future, AsyncRead, AsyncWrite}; use futures::{AsyncReadExt, Stream}; use futures::{AsyncWriteExt, StreamExt}; use libp2p_core::muxing::StreamMuxerExt; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, StreamMuxer, UpgradeInfo}; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; +use libp2p_core::{StreamMuxer, UpgradeInfo}; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -12,15 +13,15 @@ use std::{fmt, mem}; pub async fn connected_muxers_on_memory_ring_buffer() -> (M, M) where - MC: InboundUpgrade - + OutboundUpgrade + MC: InboundConnectionUpgrade + + OutboundConnectionUpgrade + Send + 'static + Default, ::Info: Send, <::InfoIter as IntoIterator>::IntoIter: Send, - >::Future: Send, - >::Future: Send, + >::Future: Send, + >::Future: Send, E: std::error::Error + Send + Sync + 'static, { let (alice, bob) = futures_ringbuf::Endpoint::pair(100, 100); diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index af761d76fe0..d9925596bad 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -1,5 +1,7 @@ ## 0.45.0 - unreleased +- Migrate to `{In,Out}boundConnectionUpgrade` traits. + See [PR 4695](https://github.com/libp2p/rust-libp2p/pull/4695). ## 0.44.1 diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 12e5dd8c1ff..073a5723d2e 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -24,7 +24,7 @@ use futures::{future, prelude::*, ready}; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeInfo}; use std::collections::VecDeque; use std::io::{IoSlice, IoSliceMut}; use std::task::Waker; @@ -311,7 +311,7 @@ impl UpgradeInfo for Config { } } -impl InboundUpgrade for Config +impl InboundConnectionUpgrade for Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { @@ -325,7 +325,7 @@ where } } -impl OutboundUpgrade for Config +impl OutboundConnectionUpgrade for Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { diff --git a/transports/noise/CHANGELOG.md b/transports/noise/CHANGELOG.md index c7f0308754a..e53d3a1077e 100644 --- a/transports/noise/CHANGELOG.md +++ b/transports/noise/CHANGELOG.md @@ -1,5 +1,7 @@ ## 0.44.0 - unreleased +- Migrate to `{In,Out}boundConnectionUpgrade` traits. + See [PR 4695](https://github.com/libp2p/rust-libp2p/pull/4695). ## 0.43.2 diff --git a/transports/noise/src/lib.rs b/transports/noise/src/lib.rs index be73ea3f7f9..485f5d68155 100644 --- a/transports/noise/src/lib.rs +++ b/transports/noise/src/lib.rs @@ -65,7 +65,8 @@ use crate::handshake::State; use crate::io::handshake; use crate::protocol::{noise_params_into_builder, AuthenticKeypair, Keypair, PARAMS_XX}; use futures::prelude::*; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; +use libp2p_core::UpgradeInfo; use libp2p_identity as identity; use libp2p_identity::PeerId; use multiaddr::Protocol; @@ -171,7 +172,7 @@ impl UpgradeInfo for Config { } } -impl InboundUpgrade for Config +impl InboundConnectionUpgrade for Config where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, { @@ -195,7 +196,7 @@ where } } -impl OutboundUpgrade for Config +impl OutboundConnectionUpgrade for Config where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, { diff --git a/transports/noise/tests/smoke.rs b/transports/noise/tests/smoke.rs index 6d1723ec7d6..ffcf7934ac0 100644 --- a/transports/noise/tests/smoke.rs +++ b/transports/noise/tests/smoke.rs @@ -20,7 +20,8 @@ use futures::prelude::*; use libp2p_core::transport::{MemoryTransport, Transport}; -use libp2p_core::{upgrade, InboundUpgrade, OutboundUpgrade}; +use libp2p_core::upgrade; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; use libp2p_identity as identity; use libp2p_noise as noise; use log::info; diff --git a/transports/noise/tests/webtransport_certhashes.rs b/transports/noise/tests/webtransport_certhashes.rs index 95ce0bf58db..b3c924f8188 100644 --- a/transports/noise/tests/webtransport_certhashes.rs +++ b/transports/noise/tests/webtransport_certhashes.rs @@ -1,4 +1,4 @@ -use libp2p_core::{InboundUpgrade, OutboundUpgrade}; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; use libp2p_identity as identity; use libp2p_noise as noise; use multihash::Multihash; diff --git a/transports/plaintext/CHANGELOG.md b/transports/plaintext/CHANGELOG.md index 8bb7dfd4809..d9aa850a807 100644 --- a/transports/plaintext/CHANGELOG.md +++ b/transports/plaintext/CHANGELOG.md @@ -1,5 +1,7 @@ ## 0.41.0 - unreleased +- Migrate to `{In,Out}boundConnectionUpgrade` traits. + See [PR 4695](https://github.com/libp2p/rust-libp2p/pull/4695). ## 0.40.1 diff --git a/transports/plaintext/src/lib.rs b/transports/plaintext/src/lib.rs index fa7cba6b8ff..1d771f9c143 100644 --- a/transports/plaintext/src/lib.rs +++ b/transports/plaintext/src/lib.rs @@ -27,7 +27,8 @@ use crate::error::Error; use bytes::Bytes; use futures::future::BoxFuture; use futures::prelude::*; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; +use libp2p_core::UpgradeInfo; use libp2p_identity as identity; use libp2p_identity::PeerId; use libp2p_identity::PublicKey; @@ -77,7 +78,7 @@ impl UpgradeInfo for Config { } } -impl InboundUpgrade for Config +impl InboundConnectionUpgrade for Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { @@ -90,7 +91,7 @@ where } } -impl OutboundUpgrade for Config +impl OutboundConnectionUpgrade for Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { diff --git a/transports/plaintext/tests/smoke.rs b/transports/plaintext/tests/smoke.rs index ed18fb44cba..fd3350fb5aa 100644 --- a/transports/plaintext/tests/smoke.rs +++ b/transports/plaintext/tests/smoke.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::io::{AsyncReadExt, AsyncWriteExt}; -use libp2p_core::InboundUpgrade; +use libp2p_core::upgrade::InboundConnectionUpgrade; use libp2p_identity as identity; use libp2p_plaintext as plaintext; use log::debug; diff --git a/transports/tls/CHANGELOG.md b/transports/tls/CHANGELOG.md index c34b228f6c7..04793c719f0 100644 --- a/transports/tls/CHANGELOG.md +++ b/transports/tls/CHANGELOG.md @@ -1,5 +1,7 @@ ## 0.3.0 - unreleased +- Migrate to `{In,Out}boundConnectionUpgrade` traits. + See [PR 4695](https://github.com/libp2p/rust-libp2p/pull/4695). ## 0.2.1 diff --git a/transports/tls/src/upgrade.rs b/transports/tls/src/upgrade.rs index bf64ce61505..463f2c3a323 100644 --- a/transports/tls/src/upgrade.rs +++ b/transports/tls/src/upgrade.rs @@ -24,7 +24,8 @@ use futures::future::BoxFuture; use futures::AsyncWrite; use futures::{AsyncRead, FutureExt}; use futures_rustls::TlsStream; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; +use libp2p_core::UpgradeInfo; use libp2p_identity as identity; use libp2p_identity::PeerId; use rustls::{CommonState, ServerName}; @@ -67,7 +68,7 @@ impl UpgradeInfo for Config { } } -impl InboundUpgrade for Config +impl InboundConnectionUpgrade for Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { @@ -90,7 +91,7 @@ where } } -impl OutboundUpgrade for Config +impl OutboundConnectionUpgrade for Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { diff --git a/transports/webtransport-websys/src/connection.rs b/transports/webtransport-websys/src/connection.rs index 9ea1dbefd1c..982f9e5a32c 100644 --- a/transports/webtransport-websys/src/connection.rs +++ b/transports/webtransport-websys/src/connection.rs @@ -1,6 +1,7 @@ use futures::FutureExt; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; -use libp2p_core::{OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::OutboundConnectionUpgrade; +use libp2p_core::UpgradeInfo; use libp2p_identity::{Keypair, PeerId}; use multihash::Multihash; use send_wrapper::SendWrapper;