Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/muxing: Have functions on StreamMuxer take Pin<&mut Self> #2765

Merged
merged 12 commits into from Aug 3, 2022
2 changes: 2 additions & 0 deletions core/CHANGELOG.md
Expand Up @@ -4,11 +4,13 @@
and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724].
- Drop `Unpin` requirement from `SubstreamBox`. See [PR 2762] and [PR 2776].
- Drop `Sync` requirement on `StreamMuxer` for constructing `StreamMuxerBox`. See [PR 2775].
- Use `Pin<&mut Self>` as the receiver type for all `StreamMuxer` poll functions. See [PR 2765].

[PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724
[PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762
[PR 2775]: https://github.com/libp2p/rust-libp2p/pull/2775
[PR 2776]: https://github.com/libp2p/rust-libp2p/pull/2776
[PR 2765]: https://github.com/libp2p/rust-libp2p/pull/2765

# 0.34.0

Expand Down
43 changes: 27 additions & 16 deletions core/src/either.rs
Expand Up @@ -204,43 +204,54 @@ where
type Substream = EitherOutput<A::Substream, B::Substream>;
type Error = EitherError<A::Error, B::Error>;

fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self {
EitherOutput::First(inner) => inner
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner
.poll_inbound(cx)
.map_ok(EitherOutput::First)
.map_err(EitherError::A),
EitherOutput::Second(inner) => inner
EitherOutputProj::Second(inner) => inner
.poll_inbound(cx)
.map_ok(EitherOutput::Second)
.map_err(EitherError::B),
}
}

fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self {
EitherOutput::First(inner) => inner
fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::First)
.map_err(EitherError::A),
EitherOutput::Second(inner) => inner
EitherOutputProj::Second(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::Second)
.map_err(EitherError::B),
}
}

fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>> {
match self {
EitherOutput::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_address_change(cx).map_err(EitherError::B),
fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => {
inner.poll_address_change(cx).map_err(EitherError::B)
}
}
}

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self {
EitherOutput::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
}
}
}
Expand Down
119 changes: 115 additions & 4 deletions core/src/muxing.rs
Expand Up @@ -52,6 +52,8 @@

use futures::{task::Context, task::Poll, AsyncRead, AsyncWrite};
use multiaddr::Multiaddr;
use std::future::Future;
use std::pin::Pin;

pub use self::boxed::StreamMuxerBox;
pub use self::boxed::SubstreamBox;
Expand All @@ -73,15 +75,24 @@ pub trait StreamMuxer {
type Error: std::error::Error;

/// Poll for new inbound substreams.
fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>>;
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>;

/// Poll for a new, outbound substream.
fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>>;
fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>;

/// Poll for an address change of the underlying connection.
///
/// Not all implementations may support this feature.
fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>>;
fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>;

/// Closes this `StreamMuxer`.
///
Expand All @@ -93,5 +104,105 @@ pub trait StreamMuxer {
/// > that the remote is properly informed of the shutdown. However, apart from
/// > properly informing the remote, there is no difference between this and
/// > immediately dropping the muxer.
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}

/// Extension trait for [`StreamMuxer`].
pub trait StreamMuxerExt: StreamMuxer + Sized {
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
/// Convenience function for calling [`StreamMuxer::poll_inbound`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_inbound_unpin(
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_inbound(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_outbound`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_outbound_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_outbound(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_address_change`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_address_change_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_address_change(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_close`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_close(cx)
}

/// Returns a future that resolves to the next inbound `Substream` opened by the remote.
fn next_inbound(&mut self) -> NextInbound<'_, Self> {
NextInbound(self)
}

/// Returns a future that opens a new outbound `Substream` with the remote.
fn next_outbound(&mut self) -> NextOutbound<'_, Self> {
NextOutbound(self)
}

/// Returns a future for closing this [`StreamMuxer`].
fn close(self) -> Close<Self> {
Close(self)
}
}

impl<S> StreamMuxerExt for S where S: StreamMuxer {}

pub struct NextInbound<'a, S>(&'a mut S);

pub struct NextOutbound<'a, S>(&'a mut S);

pub struct Close<S>(S);

impl<'a, S> Future for NextInbound<'a, S>
where
S: StreamMuxer + Unpin,
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
{
type Output = Result<S::Substream, S::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_inbound_unpin(cx)
}
}

impl<'a, S> Future for NextOutbound<'a, S>
where
S: StreamMuxer + Unpin,
{
type Output = Result<S::Substream, S::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_outbound_unpin(cx)
}
}

impl<S> Future for Close<S>
where
S: StreamMuxer + Unpin,
{
type Output = Result<(), S::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_close_unpin(cx)
}
}
68 changes: 50 additions & 18 deletions core/src/muxing/boxed.rs
@@ -1,6 +1,7 @@
use crate::StreamMuxer;
use futures::{AsyncRead, AsyncWrite};
use multiaddr::Multiaddr;
use pin_project::pin_project;
use std::error::Error;
use std::fmt;
use std::io;
Expand All @@ -10,7 +11,7 @@ use std::task::{Context, Poll};

/// Abstract `StreamMuxer`.
pub struct StreamMuxerBox {
inner: Box<dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send>,
inner: Pin<Box<dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send>>,
}

/// Abstract type for asynchronous reading and writing.
Expand All @@ -19,10 +20,12 @@ pub struct StreamMuxerBox {
/// and `AsyncWrite` capabilities.
pub struct SubstreamBox(Pin<Box<dyn AsyncReadWrite + Send>>);

#[pin_project]
struct Wrap<T>
where
T: StreamMuxer,
{
#[pin]
inner: T,
}

Expand All @@ -36,26 +39,40 @@ where
type Error = io::Error;

#[inline]
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close(cx).map_err(into_io_error)
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx).map_err(into_io_error)
}

fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.project()
.inner
.poll_inbound(cx)
.map_ok(SubstreamBox::new)
.map_err(into_io_error)
}

fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner
fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.project()
.inner
.poll_outbound(cx)
.map_ok(SubstreamBox::new)
.map_err(into_io_error)
}

fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>> {
self.inner.poll_address_change(cx).map_err(into_io_error)
fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
self.project()
.inner
.poll_address_change(cx)
.map_err(into_io_error)
}
}

Expand All @@ -77,30 +94,45 @@ impl StreamMuxerBox {
let wrap = Wrap { inner: muxer };

StreamMuxerBox {
inner: Box::new(wrap),
inner: Box::pin(wrap),
}
}

fn project(
self: Pin<&mut Self>,
) -> Pin<&mut (dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send)> {
self.get_mut().inner.as_mut()
}
}

impl StreamMuxer for StreamMuxerBox {
type Substream = SubstreamBox;
type Error = io::Error;

#[inline]
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close(cx)
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().poll_close(cx)
}

fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner.poll_inbound(cx)
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.project().poll_inbound(cx)
}

fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner.poll_outbound(cx)
fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.project().poll_outbound(cx)
}

fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>> {
self.inner.poll_address_change(cx)
fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
self.project().poll_address_change(cx)
}
}

Expand Down