Skip to content

Commit

Permalink
core/src/transport: Poll Transport directly, remove Transport::Listen…
Browse files Browse the repository at this point in the history
…er (#2652)

Remove the concept of individual `Transport::Listener` streams from `Transport`.
Instead the `Transport` is polled directly via `Transport::poll`. The
`Transport` is now responsible for driving its listeners.
  • Loading branch information
elenaf9 committed Jul 4, 2022
1 parent b28cdb3 commit 62622a1
Show file tree
Hide file tree
Showing 59 changed files with 2,070 additions and 2,020 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -35,6 +35,7 @@ default = [
"websocket",
"yamux",
]

autonat = ["dep:libp2p-autonat"]
dcutr = ["dep:libp2p-dcutr", "libp2p-metrics?/dcutr"]
deflate = ["dep:libp2p-deflate"]
Expand Down
5 changes: 5 additions & 0 deletions core/CHANGELOG.md
Expand Up @@ -5,9 +5,14 @@
in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707].
- Replace `Into<std::io::Error>` bound on `StreamMuxer::Error` with `std::error::Error`. See [PR 2710].

- Remove the concept of individual `Transport::Listener` streams from `Transport`.
Instead the `Transport` is polled directly via `Transport::poll`. The
`Transport` is now responsible for driving its listeners. See [PR 2652].

[PR 2691]: https://github.com/libp2p/rust-libp2p/pull/2691
[PR 2707]: https://github.com/libp2p/rust-libp2p/pull/2707
[PR 2710]: https://github.com/libp2p/rust-libp2p/pull/2710
[PR 2652]: https://github.com/libp2p/rust-libp2p/pull/2652

# 0.33.0

Expand Down
19 changes: 0 additions & 19 deletions core/src/connection.rs
Expand Up @@ -43,25 +43,6 @@ impl std::ops::Add<usize> for ConnectionId {
}
}

/// The ID of a single listener.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ListenerId(u64);

impl ListenerId {
/// Creates a `ListenerId` from a non-negative integer.
pub fn new(id: u64) -> Self {
Self(id)
}
}

impl std::ops::Add<u64> for ListenerId {
type Output = Self;

fn add(self, other: u64) -> Self {
Self(self.0 + other)
}
}

/// The endpoint roles associated with a peer-to-peer communication channel.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum Endpoint {
Expand Down
108 changes: 46 additions & 62 deletions core/src/either.rs
Expand Up @@ -20,7 +20,7 @@

use crate::{
muxing::{StreamMuxer, StreamMuxerEvent},
transport::{ListenerEvent, Transport, TransportError},
transport::{ListenerId, Transport, TransportError, TransportEvent},
Multiaddr, ProtocolName,
};
use futures::{
Expand Down Expand Up @@ -274,48 +274,6 @@ pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
B(B::OutboundSubstream),
}

/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherListenStreamProj)]
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherListenStream<A, B> {
First(#[pin] A),
Second(#[pin] B),
}

impl<AStream, BStream, AInner, BInner, AError, BError> Stream
for EitherListenStream<AStream, BStream>
where
AStream: TryStream<Ok = ListenerEvent<AInner, AError>, Error = AError>,
BStream: TryStream<Ok = ListenerEvent<BInner, BError>, Error = BError>,
{
type Item = Result<
ListenerEvent<EitherFuture<AInner, BInner>, EitherError<AError, BError>>,
EitherError<AError, BError>,
>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project() {
EitherListenStreamProj::First(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le
.map(EitherFuture::First)
.map_err(EitherError::A)))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))),
},
EitherListenStreamProj::Second(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le
.map(EitherFuture::Second)
.map_err(EitherError::B)))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::B(err)))),
},
}
}
}

/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherFutureProj)]
#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -385,11 +343,12 @@ impl<A: ProtocolName, B: ProtocolName> ProtocolName for EitherName<A, B> {
}
}
}

#[derive(Debug, Copy, Clone)]
#[pin_project(project = EitherTransportProj)]
#[derive(Debug)]
#[must_use = "transports do nothing unless polled"]
pub enum EitherTransport<A, B> {
Left(A),
Right(B),
Left(#[pin] A),
Right(#[pin] B),
}

impl<A, B> Transport for EitherTransport<A, B>
Expand All @@ -399,29 +358,54 @@ where
{
type Output = EitherOutput<A::Output, B::Output>;
type Error = EitherError<A::Error, B::Error>;
type Listener = EitherListenStream<A::Listener, B::Listener>;
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial = EitherFuture<A::Dial, B::Dial>;

fn listen_on(
&mut self,
addr: Multiaddr,
) -> Result<Self::Listener, TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => match a.listen_on(addr) {
Ok(listener) => Ok(EitherListenStream::First(listener)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::A(err))),
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
match self.project() {
EitherTransportProj::Left(a) => match a.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(event) => Poll::Ready(
event
.map_upgrade(EitherFuture::First)
.map_err(EitherError::A),
),
},
EitherTransport::Right(b) => match b.listen_on(addr) {
Ok(listener) => Ok(EitherListenStream::Second(listener)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::B(err))),
EitherTransportProj::Right(b) => match b.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(event) => Poll::Ready(
event
.map_upgrade(EitherFuture::Second)
.map_err(EitherError::B),
),
},
}
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
match self {
EitherTransport::Left(t) => t.remove_listener(id),
EitherTransport::Right(t) => t.remove_listener(id),
}
}

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => a.listen_on(addr).map_err(|e| match e {
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
Other(err) => Other(EitherError::A(err)),
}),
EitherTransport::Right(b) => b.listen_on(addr).map_err(|e| match e {
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
Other(err) => Other(EitherError::B(err)),
}),
}
}

fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
use TransportError::*;
match self {
Expand Down

0 comments on commit 62622a1

Please sign in to comment.