Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/muxing: Flatten StreamMuxer interface to poll_{inbound,outbound,address_change,close} #2724

Merged
merged 26 commits into from Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fffa5ff
Introduce `StreamMuxerEvent::OutboundSubstream`
thomaseizinger Jun 22, 2022
e8630d7
Introduce `StreamMuxerEvent::map_outbound_substream`
thomaseizinger Jun 22, 2022
7eeae1b
Remove "outbound" functions on `StreamMuxer` in favor of `OpenFlags`
thomaseizinger Jun 22, 2022
66dc071
Remove `OutboundStreamId`
thomaseizinger Jun 22, 2022
f155e6b
Inline `Muxing` type into `Connection`
thomaseizinger Jun 22, 2022
03861fc
Remove obvious comment on how `poll` functions work
thomaseizinger Jun 22, 2022
ed8f0b7
Fix benchmarks
thomaseizinger Jun 22, 2022
3c3bed3
Fix intra-doc link
thomaseizinger Jun 23, 2022
90b7ad6
Fix documentation of StreamMuxer
thomaseizinger Jun 24, 2022
33c9491
Minor import and comment cleanup
thomaseizinger Jun 24, 2022
546603c
Remove `StreamMuxerEvent` in favor of individual poll functions
thomaseizinger Jun 24, 2022
68b4c0b
Fix mplex tests and benches
thomaseizinger Jun 24, 2022
4a24307
Remove mentions of deleted event
thomaseizinger Jun 24, 2022
e742b8e
Update changelog
thomaseizinger Jun 24, 2022
2348a4c
Remove now unused bitflags dependency
thomaseizinger Jun 24, 2022
7695a80
Remove unused log dependency
thomaseizinger Jun 24, 2022
a086e61
Better `expect` message
thomaseizinger Jun 24, 2022
dd81ea0
Use correct function in benchmark
thomaseizinger Jun 24, 2022
f5cba3d
Merge branch 'master' into refactor/remove-poll-outbound
thomaseizinger Jul 15, 2022
3143a63
Update changelog and version numbers
thomaseizinger Jul 15, 2022
bea8318
Add unreleased label
thomaseizinger Jul 15, 2022
8bbdbfa
Add docs to `StreamMuxer`
thomaseizinger Jul 15, 2022
9409250
Fix changelog of yamux and mplex
thomaseizinger Jul 15, 2022
4f7bd45
Explicitly return result
thomaseizinger Jul 15, 2022
f20fd74
Add docs for poll loop in `Connection`
thomaseizinger Jul 15, 2022
3f776ef
Fixup a few more changelogs and manifests
thomaseizinger Jul 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 10 additions & 10 deletions Cargo.toml
Expand Up @@ -78,37 +78,37 @@ instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature
lazy_static = "1.2"

libp2p-autonat = { version = "0.6.0", path = "protocols/autonat", optional = true }
libp2p-core = { version = "0.34.0", path = "core", default-features = false }
libp2p-core = { version = "0.35.0", path = "core", default-features = false }
libp2p-dcutr = { version = "0.5.0", path = "protocols/dcutr", optional = true }
libp2p-floodsub = { version = "0.38.0", path = "protocols/floodsub", optional = true }
libp2p-identify = { version = "0.38.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.39.0", path = "protocols/kad", optional = true }
libp2p-metrics = { version = "0.8.0", path = "misc/metrics", optional = true }
libp2p-mplex = { version = "0.34.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.37.0", path = "transports/noise", optional = true }
libp2p-mplex = { version = "0.35.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.38.0", path = "transports/noise", optional = true }
libp2p-ping = { version = "0.38.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.34.0", path = "transports/plaintext", optional = true }
libp2p-plaintext = { version = "0.35.0", path = "transports/plaintext", optional = true }
libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true }
libp2p-relay = { version = "0.11.0", path = "protocols/relay", optional = true }
libp2p-rendezvous = { version = "0.8.0", path = "protocols/rendezvous", optional = true }
libp2p-request-response = { version = "0.20.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.38.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.28.0", path = "swarm-derive" }
libp2p-uds = { version = "0.33.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.34.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.38.0", path = "muxers/yamux", optional = true }
libp2p-uds = { version = "0.34.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.35.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.39.0", path = "muxers/yamux", optional = true }
multiaddr = { version = "0.14.0" }
parking_lot = "0.12.0"
pin-project = "1.0.0"
rand = "0.7.3" # Explicit dependency to be used in `wasm-bindgen` feature
smallvec = "1.6.1"

[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.34.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.34.0", path = "transports/dns", optional = true, default-features = false }
libp2p-deflate = { version = "0.35.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.35.0", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.39.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.34.0", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.36.0", path = "transports/websocket", optional = true }
libp2p-websocket = { version = "0.37.0", path = "transports/websocket", optional = true }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
libp2p-gossipsub = { version = "0.40.0", path = "protocols/gossipsub", optional = true }
Expand Down
8 changes: 7 additions & 1 deletion core/CHANGELOG.md
@@ -1,6 +1,12 @@
# 0.35.0 [unreleased]

- Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound`
and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724].

[PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724

# 0.34.0

- Introduce `StreamMuxerEvent::map_inbound_stream`. See [PR 2691].
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deleted this changelog entry because it is not released yet and it no longer makes any sense if the entire type is gone now!

- Remove `{read,write,flush,shutdown,destroy}_substream` functions from `StreamMuxer` trait
in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707].
- Replace `Into<std::io::Error>` bound on `StreamMuxer::Error` with `std::error::Error`. See [PR 2710].
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-core"
edition = "2021"
rust-version = "1.56.1"
description = "Core traits and structs of libp2p"
version = "0.34.0"
version = "0.35.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
65 changes: 18 additions & 47 deletions core/src/either.rs
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
muxing::{StreamMuxer, StreamMuxerEvent},
muxing::StreamMuxer,
transport::{ListenerId, Transport, TransportError, TransportEvent},
Multiaddr, ProtocolName,
};
Expand Down Expand Up @@ -202,60 +202,38 @@ where
B: StreamMuxer,
{
type Substream = EitherOutput<A::Substream, B::Substream>;
type OutboundSubstream = EitherOutbound<A, B>;
type Error = EitherError<A::Error, B::Error>;

fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self {
EitherOutput::First(inner) => inner
.poll_event(cx)
.map_err(EitherError::A)
.map_ok(|event| event.map_inbound_stream(EitherOutput::First)),
.poll_inbound(cx)
.map_ok(EitherOutput::First)
.map_err(EitherError::A),
EitherOutput::Second(inner) => inner
.poll_event(cx)
.map_err(EitherError::B)
.map_ok(|event| event.map_inbound_stream(EitherOutput::Second)),
.poll_inbound(cx)
.map_ok(EitherOutput::Second)
.map_err(EitherError::B),
}
}

fn open_outbound(&self) -> Self::OutboundSubstream {
fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self {
EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()),
EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()),
}
}

fn poll_outbound(
&self,
cx: &mut Context<'_>,
substream: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::First))
EitherOutput::First(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::First)
.map_err(EitherError::A),
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::Second))
EitherOutput::Second(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::Second)
.map_err(EitherError::B),
_ => panic!("Wrong API usage"),
}
}

fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>> {
match self {
EitherOutput::First(inner) => match substream {
EitherOutbound::A(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::Second(inner) => match substream {
EitherOutbound::B(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_address_change(cx).map_err(EitherError::B),
}
}

Expand All @@ -267,13 +245,6 @@ where
}
}

#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
A(A::OutboundSubstream),
B(B::OutboundSubstream),
}

/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherFutureProj)]
#[derive(Debug, Copy, Clone)]
Expand Down
90 changes: 9 additions & 81 deletions core/src/muxing.rs
Expand Up @@ -63,62 +63,25 @@ mod singleton;
/// Provides multiplexing for a connection by allowing users to open substreams.
///
/// A substream created by a [`StreamMuxer`] is a type that implements [`AsyncRead`] and [`AsyncWrite`].
///
/// Inbound substreams are reported via [`StreamMuxer::poll_event`].
/// Outbound substreams can be opened via [`StreamMuxer::open_outbound`] and subsequent polling via
/// [`StreamMuxer::poll_outbound`].
/// The [`StreamMuxer`] itself is modelled closely after [`AsyncWrite`]. It features `poll`-style
/// functions that allow the implementation to make progress on various tasks.
pub trait StreamMuxer {
/// Type of the object that represents the raw substream where data can be read and written.
type Substream: AsyncRead + AsyncWrite;

/// Future that will be resolved when the outgoing substream is open.
type OutboundSubstream;

/// Error type of the muxer
type Error: std::error::Error;

/// Polls for a connection-wide event.
///
/// This function behaves the same as a `Stream`.
///
/// If `Pending` is returned, then the current task will be notified once the muxer
/// is ready to be polled, similar to the API of `Stream::poll()`.
/// Only the latest task that was used to call this method may be notified.
///
/// It is permissible and common to use this method to perform background
/// work, such as processing incoming packets and polling timers.
///
/// An error can be generated if the connection has been closed.
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>>;
/// Poll for new inbound substreams.
fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>>;

/// Opens a new outgoing substream, and produces the equivalent to a future that will be
/// resolved when it becomes available.
///
/// The API of `OutboundSubstream` is totally opaque, and the object can only be interfaced
/// through the methods on the `StreamMuxer` trait.
fn open_outbound(&self) -> Self::OutboundSubstream;
/// Poll for a new, outbound substream.
fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>>;

/// Polls the outbound substream.
///
/// If `Pending` is returned, then the current task will be notified once the substream
/// is ready to be polled, similar to the API of `Future::poll()`.
/// However, for each individual outbound substream, only the latest task that was used to
/// call this method may be notified.
/// Poll for an address change of the underlying connection.
///
/// May panic or produce an undefined result if an earlier polling of the same substream
/// returned `Ready` or `Err`.
fn poll_outbound(
&self,
cx: &mut Context<'_>,
s: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>>;

/// Destroys an outbound substream future. Use this after the outbound substream has finished,
/// or if you want to interrupt it.
fn destroy_outbound(&self, s: Self::OutboundSubstream);
/// Not all implementations may support this feature.
fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>>;

/// Closes this `StreamMuxer`.
///
Expand All @@ -132,38 +95,3 @@ pub trait StreamMuxer {
/// > immediately dropping the muxer.
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}

/// Event about a connection, reported by an implementation of [`StreamMuxer`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StreamMuxerEvent<T> {
/// Remote has opened a new substream. Contains the substream in question.
InboundSubstream(T),

/// Address to the remote has changed. The previous one is now obsolete.
///
/// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes
/// > can change their IP address while retaining the same QUIC connection.
AddressChange(Multiaddr),
}

impl<T> StreamMuxerEvent<T> {
/// If `self` is a [`StreamMuxerEvent::InboundSubstream`], returns the content. Otherwise
/// returns `None`.
pub fn into_inbound_substream(self) -> Option<T> {
if let StreamMuxerEvent::InboundSubstream(s) = self {
Some(s)
} else {
None
}
}

/// Map the stream within [`StreamMuxerEvent::InboundSubstream`] to a new type.
pub fn map_inbound_stream<O>(self, map: impl FnOnce(T) -> O) -> StreamMuxerEvent<O> {
match self {
StreamMuxerEvent::InboundSubstream(stream) => {
StreamMuxerEvent::InboundSubstream(map(stream))
}
StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
}
}
}