Skip to content

Commit

Permalink
Re-design the StreamMuxer trait
Browse files Browse the repository at this point in the history
The current `StreamMuxer` API is based around a poll-based approach
where each substream needs to be passed by value to read/write/close
it.

The muxer itself however is not available in the end-user abstractions
like `ConnectionHandler`. To circumvent this, the `StreamMuxerBox`
type exists which allows a `StreamMuxer` to be cloned. Together with
`SubstreamRef`, this allows each substream to own a reference to the
muxer it was created with and pass itself to the muxer within
implementations of `AsyncRead` and `AsyncWrite`. These implementations
are convenient as they allow an end-user to simply read and write to
the substream without holding a reference to anything else.

We can achieve the same goal by changing the `StreamMuxerBox` abstraction
to use a `SubstreamBox`. Users of `libp2p-core` can continue to
depend on the `Substream` associated type of `StreamMuxer`.
  • Loading branch information
thomaseizinger committed Jun 15, 2022
1 parent 4c39aec commit d44b0a6
Show file tree
Hide file tree
Showing 18 changed files with 449 additions and 1,263 deletions.
3 changes: 2 additions & 1 deletion core/CHANGELOG.md
Expand Up @@ -4,12 +4,13 @@
- Remove `StreamMuxer::flush_all`. See [PR 2669].
- Rename `StreamMuxer::close` to `StreamMuxer::poll_close`. See [PR 2666].
- Remove deprecated function `StreamMuxer::is_remote_acknowledged`. See [PR 2665].
- Re-design `StreamMuxer` trait to take `&mut self` and remove all associated types but `Substream`. See [PR 2648].

[PR 2529]: https://github.com/libp2p/rust-libp2p/pull/2529
[PR 2666]: https://github.com/libp2p/rust-libp2p/pull/2666
[PR 2665]: https://github.com/libp2p/rust-libp2p/pull/2665
[PR 2669]: https://github.com/libp2p/rust-libp2p/pull/2669

[PR 2648]: https://github.com/libp2p/rust-libp2p/pull/2648

# 0.32.1

Expand Down
150 changes: 12 additions & 138 deletions core/src/either.rs
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
muxing::{StreamMuxer, StreamMuxerEvent},
muxing::{OutboundSubstreamId, StreamMuxer, StreamMuxerEvent},
transport::{ListenerEvent, Transport, TransportError},
Multiaddr, ProtocolName,
};
Expand Down Expand Up @@ -202,162 +202,36 @@ where
B: StreamMuxer,
{
type Substream = EitherOutput<A::Substream, B::Substream>;
type OutboundSubstream = EitherOutbound<A, B>;

fn poll_event(
&self,
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<io::Result<StreamMuxerEvent<Self::Substream>>> {
match self {
EitherOutput::First(inner) => inner.poll_event(cx).map(|result| {
result.map(|event| match event {
StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
StreamMuxerEvent::InboundSubstream(substream) => {
StreamMuxerEvent::InboundSubstream(EitherOutput::First(substream))
}
})
}),
EitherOutput::Second(inner) => inner.poll_event(cx).map(|result| {
result.map(|event| match event {
StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
StreamMuxerEvent::InboundSubstream(substream) => {
StreamMuxerEvent::InboundSubstream(EitherOutput::Second(substream))
}
})
}),
}
}

fn open_outbound(&self) -> Self::OutboundSubstream {
match self {
EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()),
EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()),
}
}

fn poll_outbound(
&self,
cx: &mut Context<'_>,
substream: &mut Self::OutboundSubstream,
) -> Poll<io::Result<Self::Substream>> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::First)),
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::Second)),
_ => panic!("Wrong API usage"),
}
}

fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
match self {
EitherOutput::First(inner) => match substream {
EitherOutbound::A(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::Second(inner) => match substream {
EitherOutbound::B(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage"),
},
}
}

fn read_substream(
&self,
cx: &mut Context<'_>,
sub: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.read_substream(cx, sub, buf)
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.read_substream(cx, sub, buf)
}
_ => panic!("Wrong API usage"),
}
}

fn write_substream(
&self,
cx: &mut Context<'_>,
sub: &mut Self::Substream,
buf: &[u8],
) -> Poll<io::Result<usize>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.write_substream(cx, sub, buf)
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.write_substream(cx, sub, buf)
}
_ => panic!("Wrong API usage"),
}
}

fn flush_substream(
&self,
cx: &mut Context<'_>,
sub: &mut Self::Substream,
) -> Poll<io::Result<()>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.flush_substream(cx, sub)
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.flush_substream(cx, sub)
EitherOutput::First(inner) => {
inner.poll(cx).map_ok(|e| e.map_stream(EitherOutput::First))
}
_ => panic!("Wrong API usage"),
EitherOutput::Second(inner) => inner
.poll(cx)
.map_ok(|e| e.map_stream(EitherOutput::Second)),
}
}

fn shutdown_substream(
&self,
cx: &mut Context<'_>,
sub: &mut Self::Substream,
) -> Poll<io::Result<()>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.shutdown_substream(cx, sub)
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.shutdown_substream(cx, sub)
}
_ => panic!("Wrong API usage"),
}
}

fn destroy_substream(&self, substream: Self::Substream) {
fn open_outbound(&mut self) -> OutboundSubstreamId {
match self {
EitherOutput::First(inner) => match substream {
EitherOutput::First(substream) => inner.destroy_substream(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::Second(inner) => match substream {
EitherOutput::Second(substream) => inner.destroy_substream(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::First(inner) => inner.open_outbound(),
EitherOutput::Second(inner) => inner.open_outbound(),
}
}

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self {
EitherOutput::First(inner) => inner.poll_close(cx),
EitherOutput::Second(inner) => inner.poll_close(cx),
}
}
}

#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
A(A::OutboundSubstream),
B(B::OutboundSubstream),
}

/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherListenStreamProj)]
#[derive(Debug, Copy, Clone)]
Expand Down

0 comments on commit d44b0a6

Please sign in to comment.