Skip to content

Commit

Permalink
core/muxing: Flatten StreamMuxer interface to `poll_{inbound,outbou…
Browse files Browse the repository at this point in the history
…nd,address_change,close}` (#2724)

Instead of having a mix of `poll_event`, `poll_outbound` and `poll_close`, we
flatten the entire interface of `StreamMuxer` into 4 individual functions:

- `poll_inbound`
- `poll_outbound`
- `poll_address_change`
- `poll_close`

This design is closer to the design of other async traits like `AsyncRead` and
`AsyncWrite`. It also allows us to delete the `StreamMuxerEvent`.
  • Loading branch information
thomaseizinger committed Jul 18, 2022
1 parent d4f8ec2 commit 1a553db
Show file tree
Hide file tree
Showing 61 changed files with 267 additions and 682 deletions.
20 changes: 10 additions & 10 deletions Cargo.toml
Expand Up @@ -78,37 +78,37 @@ instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature
lazy_static = "1.2"

libp2p-autonat = { version = "0.6.0", path = "protocols/autonat", optional = true }
libp2p-core = { version = "0.34.0", path = "core", default-features = false }
libp2p-core = { version = "0.35.0", path = "core", default-features = false }
libp2p-dcutr = { version = "0.5.0", path = "protocols/dcutr", optional = true }
libp2p-floodsub = { version = "0.38.0", path = "protocols/floodsub", optional = true }
libp2p-identify = { version = "0.38.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.39.0", path = "protocols/kad", optional = true }
libp2p-metrics = { version = "0.8.0", path = "misc/metrics", optional = true }
libp2p-mplex = { version = "0.34.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.37.0", path = "transports/noise", optional = true }
libp2p-mplex = { version = "0.35.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.38.0", path = "transports/noise", optional = true }
libp2p-ping = { version = "0.38.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.34.0", path = "transports/plaintext", optional = true }
libp2p-plaintext = { version = "0.35.0", path = "transports/plaintext", optional = true }
libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true }
libp2p-relay = { version = "0.11.0", path = "protocols/relay", optional = true }
libp2p-rendezvous = { version = "0.8.0", path = "protocols/rendezvous", optional = true }
libp2p-request-response = { version = "0.20.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.38.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.28.0", path = "swarm-derive" }
libp2p-uds = { version = "0.33.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.34.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.38.0", path = "muxers/yamux", optional = true }
libp2p-uds = { version = "0.34.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.35.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.39.0", path = "muxers/yamux", optional = true }
multiaddr = { version = "0.14.0" }
parking_lot = "0.12.0"
pin-project = "1.0.0"
rand = "0.7.3" # Explicit dependency to be used in `wasm-bindgen` feature
smallvec = "1.6.1"

[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.34.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.34.0", path = "transports/dns", optional = true, default-features = false }
libp2p-deflate = { version = "0.35.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.35.0", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.39.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.34.0", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.36.0", path = "transports/websocket", optional = true }
libp2p-websocket = { version = "0.37.0", path = "transports/websocket", optional = true }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
libp2p-gossipsub = { version = "0.40.0", path = "protocols/gossipsub", optional = true }
Expand Down
8 changes: 7 additions & 1 deletion core/CHANGELOG.md
@@ -1,6 +1,12 @@
# 0.35.0 [unreleased]

- Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound`
and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724].

[PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724

# 0.34.0

- Introduce `StreamMuxerEvent::map_inbound_stream`. See [PR 2691].
- Remove `{read,write,flush,shutdown,destroy}_substream` functions from `StreamMuxer` trait
in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707].
- Replace `Into<std::io::Error>` bound on `StreamMuxer::Error` with `std::error::Error`. See [PR 2710].
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-core"
edition = "2021"
rust-version = "1.56.1"
description = "Core traits and structs of libp2p"
version = "0.34.0"
version = "0.35.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
65 changes: 18 additions & 47 deletions core/src/either.rs
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
muxing::{StreamMuxer, StreamMuxerEvent},
muxing::StreamMuxer,
transport::{ListenerId, Transport, TransportError, TransportEvent},
Multiaddr, ProtocolName,
};
Expand Down Expand Up @@ -202,60 +202,38 @@ where
B: StreamMuxer,
{
type Substream = EitherOutput<A::Substream, B::Substream>;
type OutboundSubstream = EitherOutbound<A, B>;
type Error = EitherError<A::Error, B::Error>;

fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self {
EitherOutput::First(inner) => inner
.poll_event(cx)
.map_err(EitherError::A)
.map_ok(|event| event.map_inbound_stream(EitherOutput::First)),
.poll_inbound(cx)
.map_ok(EitherOutput::First)
.map_err(EitherError::A),
EitherOutput::Second(inner) => inner
.poll_event(cx)
.map_err(EitherError::B)
.map_ok(|event| event.map_inbound_stream(EitherOutput::Second)),
.poll_inbound(cx)
.map_ok(EitherOutput::Second)
.map_err(EitherError::B),
}
}

fn open_outbound(&self) -> Self::OutboundSubstream {
fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self {
EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()),
EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()),
}
}

fn poll_outbound(
&self,
cx: &mut Context<'_>,
substream: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::First))
EitherOutput::First(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::First)
.map_err(EitherError::A),
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::Second))
EitherOutput::Second(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::Second)
.map_err(EitherError::B),
_ => panic!("Wrong API usage"),
}
}

fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>> {
match self {
EitherOutput::First(inner) => match substream {
EitherOutbound::A(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::Second(inner) => match substream {
EitherOutbound::B(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_address_change(cx).map_err(EitherError::B),
}
}

Expand All @@ -267,13 +245,6 @@ where
}
}

#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
A(A::OutboundSubstream),
B(B::OutboundSubstream),
}

/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherFutureProj)]
#[derive(Debug, Copy, Clone)]
Expand Down
90 changes: 9 additions & 81 deletions core/src/muxing.rs
Expand Up @@ -63,62 +63,25 @@ mod singleton;
/// Provides multiplexing for a connection by allowing users to open substreams.
///
/// A substream created by a [`StreamMuxer`] is a type that implements [`AsyncRead`] and [`AsyncWrite`].
///
/// Inbound substreams are reported via [`StreamMuxer::poll_event`].
/// Outbound substreams can be opened via [`StreamMuxer::open_outbound`] and subsequent polling via
/// [`StreamMuxer::poll_outbound`].
/// The [`StreamMuxer`] itself is modelled closely after [`AsyncWrite`]. It features `poll`-style
/// functions that allow the implementation to make progress on various tasks.
pub trait StreamMuxer {
/// Type of the object that represents the raw substream where data can be read and written.
type Substream: AsyncRead + AsyncWrite;

/// Future that will be resolved when the outgoing substream is open.
type OutboundSubstream;

/// Error type of the muxer
type Error: std::error::Error;

/// Polls for a connection-wide event.
///
/// This function behaves the same as a `Stream`.
///
/// If `Pending` is returned, then the current task will be notified once the muxer
/// is ready to be polled, similar to the API of `Stream::poll()`.
/// Only the latest task that was used to call this method may be notified.
///
/// It is permissible and common to use this method to perform background
/// work, such as processing incoming packets and polling timers.
///
/// An error can be generated if the connection has been closed.
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>>;
/// Poll for new inbound substreams.
fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>>;

/// Opens a new outgoing substream, and produces the equivalent to a future that will be
/// resolved when it becomes available.
///
/// The API of `OutboundSubstream` is totally opaque, and the object can only be interfaced
/// through the methods on the `StreamMuxer` trait.
fn open_outbound(&self) -> Self::OutboundSubstream;
/// Poll for a new, outbound substream.
fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>>;

/// Polls the outbound substream.
///
/// If `Pending` is returned, then the current task will be notified once the substream
/// is ready to be polled, similar to the API of `Future::poll()`.
/// However, for each individual outbound substream, only the latest task that was used to
/// call this method may be notified.
/// Poll for an address change of the underlying connection.
///
/// May panic or produce an undefined result if an earlier polling of the same substream
/// returned `Ready` or `Err`.
fn poll_outbound(
&self,
cx: &mut Context<'_>,
s: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>>;

/// Destroys an outbound substream future. Use this after the outbound substream has finished,
/// or if you want to interrupt it.
fn destroy_outbound(&self, s: Self::OutboundSubstream);
/// Not all implementations may support this feature.
fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>>;

/// Closes this `StreamMuxer`.
///
Expand All @@ -132,38 +95,3 @@ pub trait StreamMuxer {
/// > immediately dropping the muxer.
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}

/// Event about a connection, reported by an implementation of [`StreamMuxer`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StreamMuxerEvent<T> {
/// Remote has opened a new substream. Contains the substream in question.
InboundSubstream(T),

/// Address to the remote has changed. The previous one is now obsolete.
///
/// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes
/// > can change their IP address while retaining the same QUIC connection.
AddressChange(Multiaddr),
}

impl<T> StreamMuxerEvent<T> {
/// If `self` is a [`StreamMuxerEvent::InboundSubstream`], returns the content. Otherwise
/// returns `None`.
pub fn into_inbound_substream(self) -> Option<T> {
if let StreamMuxerEvent::InboundSubstream(s) = self {
Some(s)
} else {
None
}
}

/// Map the stream within [`StreamMuxerEvent::InboundSubstream`] to a new type.
pub fn map_inbound_stream<O>(self, map: impl FnOnce(T) -> O) -> StreamMuxerEvent<O> {
match self {
StreamMuxerEvent::InboundSubstream(stream) => {
StreamMuxerEvent::InboundSubstream(map(stream))
}
StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
}
}
}

0 comments on commit 1a553db

Please sign in to comment.