Skip to content

Commit

Permalink
[WIP] add support for tls+noise and yamux+mplex
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Sep 17, 2023
1 parent aa19b1f commit 14c199f
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ env_logger = "0.10.0"
clap = { version = "4.1.6", features = ["derive"] }
tokio = { version = "1.15", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread"] }

libp2p-mplex = { workspace = true }
libp2p-noise = { workspace = true }
libp2p-tcp = { workspace = true, features = ["tokio"] }

Expand Down
147 changes: 103 additions & 44 deletions libp2p/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use libp2p_core::upgrade::SelectUpgrade;
use libp2p_core::{muxing::StreamMuxerBox, Transport};
use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, StreamMuxer, UpgradeInfo};
use libp2p_identity::{Keypair, PeerId};
Expand Down Expand Up @@ -129,47 +130,38 @@ impl<Provider> SwarmBuilder<Provider, TcpPhase> {
self.with_tcp_config(Default::default())
}

pub fn with_tcp_2<D, SecU, E, MuxU, MuxE, MuxM>(
pub fn with_tcp_2<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
self,
security_upgrade: SecU,
multiplexer_upgrade: MuxU,
security_upgrade: SecUpgrade,
multiplexer_upgrade: MuxUpgrade,
) -> Result<
SwarmBuilder<Provider, QuicPhase<impl AuthenticatedMultiplexedTransport>>,
AuthenticationError,
>
where
D: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
SecU: IntoSecurityUpgrade,
SecU::Upgrade: InboundUpgrade<
Negotiated<libp2p_tcp::tokio::TcpStream>,
Output = (PeerId, D),
Error = E,
>,
<SecU::Upgrade as InboundUpgrade<Negotiated<libp2p_tcp::tokio::TcpStream>>>::Future: Send,
SecU::Upgrade: OutboundUpgrade<
Negotiated<libp2p_tcp::tokio::TcpStream>,
Output = (PeerId, D),
Error = E,
> + Clone
+ Send + 'static,

<SecU::Upgrade as OutboundUpgrade<Negotiated<libp2p_tcp::tokio::TcpStream>>>::Future: Send,
E: std::error::Error + Send + Sync + 'static,
<<<SecU as IntoSecurityUpgrade>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<SecU as IntoSecurityUpgrade>::Upgrade as UpgradeInfo>::Info: Send,

MuxM: StreamMuxer + Send + 'static,
MuxM::Substream: Send + 'static,
MuxM::Error: Send + Sync + 'static,

MuxU: IntoMultiplexerUpgrade,
MuxU::Upgrade: InboundUpgrade<Negotiated<D>, Output = MuxM, Error = MuxE>,
<MuxU::Upgrade as InboundUpgrade<Negotiated<D>>>::Future: Send,
MuxU::Upgrade: OutboundUpgrade<Negotiated<D>, Output = MuxM, Error = MuxE> + Clone + Send + 'static,
<MuxU::Upgrade as OutboundUpgrade<Negotiated<D>>>::Future: Send,
MuxE: std::error::Error + Send + Sync + 'static,
<<<MuxU as IntoMultiplexerUpgrade>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<MuxU as IntoMultiplexerUpgrade>::Upgrade as UpgradeInfo>::Info: Send,
SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
SecError: std::error::Error + Send + Sync + 'static,
SecUpgrade: IntoSecurityUpgrade<libp2p_tcp::tokio::TcpStream>,
SecUpgrade::Upgrade: Clone + Send + 'static,
SecUpgrade::Upgrade: InboundUpgrade<Negotiated<libp2p_tcp::tokio::TcpStream>, Output = (PeerId, SecStream), Error = SecError>,
<SecUpgrade::Upgrade as InboundUpgrade<Negotiated<libp2p_tcp::tokio::TcpStream>>>::Future: Send,
SecUpgrade::Upgrade: OutboundUpgrade<Negotiated<libp2p_tcp::tokio::TcpStream>, Output = (PeerId, SecStream), Error = SecError>,
<SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<libp2p_tcp::tokio::TcpStream>>>::Future: Send,
<<<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::tokio::TcpStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<SecUpgrade as IntoSecurityUpgrade<libp2p_tcp::tokio::TcpStream>>::Upgrade as UpgradeInfo>::Info: Send,

MuxStream: StreamMuxer + Send + 'static,
MuxStream::Substream: Send + 'static,
MuxStream::Error: Send + Sync + 'static,
MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError>,
<MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
MuxUpgrade::Upgrade: OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError>,
MuxUpgrade::Upgrade: Clone + Send + 'static,
<MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
MuxError: std::error::Error + Send + Sync + 'static,
<<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
{
Ok(SwarmBuilder {
phase: QuicPhase {
Expand All @@ -196,34 +188,59 @@ impl<Provider> SwarmBuilder<Provider, TcpPhase> {
}
}

pub trait IntoSecurityUpgrade {
pub trait IntoSecurityUpgrade<C> {
type Upgrade;

fn into_security_upgrade(self, keypair: &Keypair) -> Self::Upgrade;
}

impl<T, F> IntoSecurityUpgrade for F
impl<C, T, F> IntoSecurityUpgrade<C> for (F,)
where
F: for<'a> FnOnce(&'a Keypair) -> T,
{
type Upgrade = T;

fn into_security_upgrade(self, keypair: &Keypair) -> Self::Upgrade {
self(keypair)
self.0(keypair)
}
}

pub trait IntoMultiplexerUpgrade {
impl<F1, F2, T1, T2, C, O1, O2> IntoSecurityUpgrade<C> for (F1, F2)
where
F1: for<'a> FnOnce(&'a Keypair) -> T1,
F2: for<'a> FnOnce(&'a Keypair) -> T2,
T1: InboundUpgrade<Negotiated<C>, Output = (PeerId, O1)>,
T2: InboundUpgrade<Negotiated<C>, Output = (PeerId, O2)>,
{
type Upgrade = map::Upgrade<SelectUpgrade<T1, T2>, fn(futures::future::Either<<T1 as InboundUpgrade<Negotiated<C>>>::Output, <T2 as InboundUpgrade<Negotiated<C>>>::Output>) -> (PeerId, futures::future::Either<O1, O2>)>;

fn into_security_upgrade(self, keypair: &Keypair) -> Self::Upgrade {
map::Upgrade::new(
SelectUpgrade::new(self.0(keypair), self.1(keypair)),
|either |futures::future::Either::factor_first(either),
)
}
}

pub trait IntoMultiplexerUpgrade<C> {
type Upgrade;

fn into_multiplexer_upgrade(self) -> Self::Upgrade;
}

impl<U> IntoMultiplexerUpgrade for U {
type Upgrade = Self;
impl<C, U> IntoMultiplexerUpgrade<C> for (U,) {
type Upgrade = U;

fn into_multiplexer_upgrade(self) -> Self::Upgrade {
self.0
}
}

impl<C, U1, U2> IntoMultiplexerUpgrade<C> for (U1, U2) {
type Upgrade = SelectUpgrade<U1, U2>;

fn into_multiplexer_upgrade(self) -> Self::Upgrade {
self
SelectUpgrade::new(self.0, self.1)
}
}

Expand Down Expand Up @@ -1710,8 +1727,50 @@ mod tests {
.with_tokio()
.with_tcp_2(
// TODO: Handle unwrap
|keypair: &Keypair| libp2p_tls::Config::new(keypair).unwrap(),
libp2p_yamux::Config::default(),
(|keypair: &Keypair| libp2p_tls::Config::new(keypair).unwrap(),),
// TODO: The single tuple is not intuitive.
(libp2p_yamux::Config::default(),),
)
.unwrap()
.with_behaviour(|_| libp2p_swarm::dummy::Behaviour)
.unwrap()
.build();
}

#[test]
#[cfg(all(feature = "tokio", feature = "tcp", feature = "tls", feature = "noise"))]
fn tcp_yamux_mplex() {
let _ = SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp_2(
// TODO: Handle unwrap
(|keypair: &Keypair| libp2p_tls::Config::new(keypair).unwrap(),),
(
libp2p_yamux::Config::default(),
libp2p_mplex::MplexConfig::default(),
),
)
.unwrap()
.with_behaviour(|_| libp2p_swarm::dummy::Behaviour)
.unwrap()
.build();
}

#[test]
#[cfg(all(feature = "tokio", feature = "tcp", feature = "tls", feature = "noise"))]
fn tcp_tls_noise() {
let _ = SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp_2(
// TODO: Handle unwrap
(
|keypair: &Keypair| libp2p_tls::Config::new(keypair).unwrap(),
|keypair: &Keypair| libp2p_noise::Config::new(keypair).unwrap(),
),
(
libp2p_yamux::Config::default(),
libp2p_mplex::MplexConfig::default(),
),
)
.unwrap()
.with_behaviour(|_| libp2p_swarm::dummy::Behaviour)
Expand Down
2 changes: 1 addition & 1 deletion libp2p/src/builder/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};

/// Upgrade applying a function to an inner upgrade.
#[derive(Debug, Clone)]
pub(crate) struct Upgrade<U, F> {
pub struct Upgrade<U, F> {
upgrade: U,
fun: F,
}
Expand Down

0 comments on commit 14c199f

Please sign in to comment.