Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/muxing: Have functions on StreamMuxer take Pin<&mut Self> #2765

Merged
merged 12 commits into from Aug 3, 2022
2 changes: 2 additions & 0 deletions core/CHANGELOG.md
Expand Up @@ -3,9 +3,11 @@
- Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound`
and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724].
- Drop `Unpin` requirement from `SubstreamBox`. See [PR 2762].
- Use `Pin<&mut Self>` as the receiver type for all `StreamMuxer` poll functions. See [PR 2765].

[PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724
[PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762
[PR 2765]: https://github.com/libp2p/rust-libp2p/pull/2765

# 0.34.0

Expand Down
43 changes: 27 additions & 16 deletions core/src/either.rs
Expand Up @@ -204,43 +204,54 @@ where
type Substream = EitherOutput<A::Substream, B::Substream>;
type Error = EitherError<A::Error, B::Error>;

fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self {
EitherOutput::First(inner) => inner
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner
.poll_inbound(cx)
.map_ok(EitherOutput::First)
.map_err(EitherError::A),
EitherOutput::Second(inner) => inner
EitherOutputProj::Second(inner) => inner
.poll_inbound(cx)
.map_ok(EitherOutput::Second)
.map_err(EitherError::B),
}
}

fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self {
EitherOutput::First(inner) => inner
fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::First)
.map_err(EitherError::A),
EitherOutput::Second(inner) => inner
EitherOutputProj::Second(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::Second)
.map_err(EitherError::B),
}
}

fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>> {
match self {
EitherOutput::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_address_change(cx).map_err(EitherError::B),
fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => {
inner.poll_address_change(cx).map_err(EitherError::B)
}
}
}

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self {
EitherOutput::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
}
}
}
Expand Down
146 changes: 142 additions & 4 deletions core/src/muxing.rs
Expand Up @@ -52,6 +52,8 @@

use futures::{task::Context, task::Poll, AsyncRead, AsyncWrite};
use multiaddr::Multiaddr;
use std::future::Future;
use std::pin::Pin;

pub use self::boxed::StreamMuxerBox;
pub use self::boxed::SubstreamBox;
Expand All @@ -73,15 +75,24 @@ pub trait StreamMuxer {
type Error: std::error::Error;

/// Poll for new inbound substreams.
fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>>;
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>;

/// Poll for a new, outbound substream.
fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>>;
fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>;

/// Poll for an address change of the underlying connection.
///
/// Not all implementations may support this feature.
fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>>;
fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>;

/// Closes this `StreamMuxer`.
///
Expand All @@ -93,5 +104,132 @@ pub trait StreamMuxer {
/// > that the remote is properly informed of the shutdown. However, apart from
/// > properly informing the remote, there is no difference between this and
/// > immediately dropping the muxer.
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}

/// Extension trait for [`StreamMuxer`].
pub trait StreamMuxerExt: StreamMuxer + Sized {
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
fn poll_inbound_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>
where
Self: Unpin;

fn poll_outbound_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>
where
Self: Unpin;

fn poll_address_change_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>
where
Self: Unpin;

fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where
Self: Unpin;

fn next_inbound(&mut self) -> NextInbound<'_, Self>;

fn next_outbound(&mut self) -> NextOutbound<'_, Self>;

fn close(self) -> Close<Self>;
}

impl<S> StreamMuxerExt for S
where
S: StreamMuxer,
{
fn poll_inbound_unpin(
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_inbound(cx)
}

fn poll_outbound_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_outbound(cx)
}

fn poll_address_change_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_address_change(cx)
}

fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_close(cx)
}

fn next_inbound(&mut self) -> NextInbound<'_, Self> {
NextInbound(self)
}

fn next_outbound(&mut self) -> NextOutbound<'_, Self> {
NextOutbound(self)
}

fn close(self) -> Close<Self> {
Close(self)
}
}

pub struct NextInbound<'a, S>(&'a mut S);

pub struct NextOutbound<'a, S>(&'a mut S);

pub struct Close<S>(S);

impl<'a, S> Future for NextInbound<'a, S>
where
S: StreamMuxer + Unpin,
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
{
type Output = Result<S::Substream, S::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_inbound_unpin(cx)
}
}

impl<'a, S> Future for NextOutbound<'a, S>
where
S: StreamMuxer + Unpin,
{
type Output = Result<S::Substream, S::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_outbound_unpin(cx)
}
}

impl<S> Future for Close<S>
where
S: StreamMuxer + Unpin,
{
type Output = Result<(), S::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_close_unpin(cx)
}
}
69 changes: 51 additions & 18 deletions core/src/muxing/boxed.rs
@@ -1,6 +1,7 @@
use crate::StreamMuxer;
use futures::{AsyncRead, AsyncWrite};
use multiaddr::Multiaddr;
use pin_project::pin_project;
use std::error::Error;
use std::fmt;
use std::io;
Expand All @@ -10,7 +11,7 @@ use std::task::{Context, Poll};

/// Abstract `StreamMuxer`.
pub struct StreamMuxerBox {
inner: Box<dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send + Sync>,
inner: Pin<Box<dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send + Sync>>,
}

/// Abstract type for asynchronous reading and writing.
Expand All @@ -19,10 +20,12 @@ pub struct StreamMuxerBox {
/// and `AsyncWrite` capabilities.
pub struct SubstreamBox(Pin<Box<dyn AsyncReadWrite + Send>>);

#[pin_project]
struct Wrap<T>
where
T: StreamMuxer,
{
#[pin]
inner: T,
}

Expand All @@ -36,26 +39,40 @@ where
type Error = io::Error;

#[inline]
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close(cx).map_err(into_io_error)
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx).map_err(into_io_error)
}

fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.project()
.inner
.poll_inbound(cx)
.map_ok(SubstreamBox::new)
.map_err(into_io_error)
}

fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner
fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.project()
.inner
.poll_outbound(cx)
.map_ok(SubstreamBox::new)
.map_err(into_io_error)
}

fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>> {
self.inner.poll_address_change(cx).map_err(into_io_error)
fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
self.project()
.inner
.poll_address_change(cx)
.map_err(into_io_error)
}
}

Expand All @@ -77,30 +94,46 @@ impl StreamMuxerBox {
let wrap = Wrap { inner: muxer };

StreamMuxerBox {
inner: Box::new(wrap),
inner: Box::pin(wrap),
}
}

fn project(
self: Pin<&mut Self>,
) -> Pin<&mut (dyn StreamMuxer<Substream = SubstreamBox, Error = io::Error> + Send + Sync)>
{
self.get_mut().inner.as_mut()
}
}

impl StreamMuxer for StreamMuxerBox {
type Substream = SubstreamBox;
type Error = io::Error;

#[inline]
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close(cx)
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().poll_close(cx)
}

fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner.poll_inbound(cx)
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.project().poll_inbound(cx)
}

fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner.poll_outbound(cx)
fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.project().poll_outbound(cx)
}

fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>> {
self.inner.poll_address_change(cx)
fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
self.project().poll_address_change(cx)
}
}

Expand Down