Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/muxing: Replace Into<io::Error> bound on StreamMuxer with std::error::Error #2710

Merged
merged 9 commits into from Jun 24, 2022
2 changes: 2 additions & 0 deletions core/CHANGELOG.md
Expand Up @@ -3,9 +3,11 @@
- Introduce `StreamMuxerEvent::map_inbound_stream`. See [PR 2691].
- Remove `{read,write,flush,shutdown,destroy}_substream` functions from `StreamMuxer` trait
in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707].
- Replace `Into<std::io::Error>` bound on `StreamMuxer::Error` with `std::error::Error`. See [PR 2710].

[PR 2691]: https://github.com/libp2p/rust-libp2p/pull/2691
[PR 2707]: https://github.com/libp2p/rust-libp2p/pull/2707
[PR 2710]: https://github.com/libp2p/rust-libp2p/pull/2710

# 0.33.0

Expand Down
14 changes: 7 additions & 7 deletions core/src/either.rs
Expand Up @@ -203,7 +203,7 @@ where
{
type Substream = EitherOutput<A::Substream, B::Substream>;
type OutboundSubstream = EitherOutbound<A, B>;
type Error = io::Error;
type Error = EitherError<A::Error, B::Error>;

fn poll_event(
&self,
Expand All @@ -212,11 +212,11 @@ where
match self {
EitherOutput::First(inner) => inner
.poll_event(cx)
.map_err(|e| e.into())
.map_err(EitherError::A)
.map_ok(|event| event.map_inbound_stream(EitherOutput::First)),
EitherOutput::Second(inner) => inner
.poll_event(cx)
.map_err(|e| e.into())
.map_err(EitherError::B)
.map_ok(|event| event.map_inbound_stream(EitherOutput::Second)),
}
}
Expand All @@ -237,11 +237,11 @@ where
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::First))
.map_err(|e| e.into()),
.map_err(EitherError::A),
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::Second))
.map_err(|e| e.into()),
.map_err(EitherError::B),
_ => panic!("Wrong API usage"),
}
}
Expand All @@ -261,8 +261,8 @@ where

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self {
EitherOutput::First(inner) => inner.poll_close(cx).map_err(|e| e.into()),
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(|e| e.into()),
EitherOutput::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions core/src/muxing.rs
Expand Up @@ -52,7 +52,6 @@

use futures::{task::Context, task::Poll, AsyncRead, AsyncWrite};
use multiaddr::Multiaddr;
use std::io;

pub use self::boxed::StreamMuxerBox;
pub use self::boxed::SubstreamBox;
Expand All @@ -76,7 +75,7 @@ pub trait StreamMuxer {
type OutboundSubstream;

/// Error type of the muxer
type Error: Into<io::Error>;
type Error: std::error::Error;

/// Polls for a connection-wide event.
///
Expand Down
38 changes: 18 additions & 20 deletions core/src/muxing/boxed.rs
@@ -1,8 +1,9 @@
use crate::muxing::StreamMuxerEvent;
use crate::StreamMuxer;
use fnv::FnvHashMap;
use futures::{AsyncRead, AsyncWrite};
use futures::{ready, AsyncRead, AsyncWrite};
use parking_lot::Mutex;
use std::error::Error;
use std::fmt;
use std::io;
use std::io::{IoSlice, IoSliceMut};
Expand Down Expand Up @@ -38,6 +39,7 @@ impl<T> StreamMuxer for Wrap<T>
where
T: StreamMuxer,
T::Substream: Send + Unpin + 'static,
T::Error: Send + Sync + 'static,
{
type Substream = SubstreamBox;
type OutboundSubstream = usize; // TODO: use a newtype
Expand All @@ -48,18 +50,10 @@ where
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
let substream = match self.inner.poll_event(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) => {
return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a)))
}
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
};
let event = ready!(self.inner.poll_event(cx).map_err(into_io_error)?)
.map_inbound_stream(SubstreamBox::new);

Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(SubstreamBox::new(
substream,
))))
Poll::Ready(Ok(event))
}

#[inline]
Expand All @@ -77,16 +71,12 @@ where
substream: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
let mut list = self.outbound.lock();
let substream = match self
let stream = ready!(self
.inner
.poll_outbound(cx, list.get_mut(substream).unwrap())
{
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(s)) => s,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
};
.map_err(into_io_error)?);

Poll::Ready(Ok(SubstreamBox::new(substream)))
Poll::Ready(Ok(SubstreamBox::new(stream)))
}

#[inline]
Expand All @@ -98,17 +88,25 @@ where

#[inline]
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close(cx).map_err(|e| e.into())
self.inner.poll_close(cx).map_err(into_io_error)
}
}

fn into_io_error<E>(err: E) -> io::Error
where
E: Error + Send + Sync + 'static,
{
io::Error::new(io::ErrorKind::Other, err)
}

impl StreamMuxerBox {
/// Turns a stream muxer into a `StreamMuxerBox`.
pub fn new<T>(muxer: T) -> StreamMuxerBox
where
T: StreamMuxer + Send + Sync + 'static,
T::OutboundSubstream: Send,
T::Substream: Send + Unpin + 'static,
T::Error: Send + Sync + 'static,
{
let wrap = Wrap {
inner: muxer,
Expand Down
1 change: 1 addition & 0 deletions core/src/transport/upgrade.rs
Expand Up @@ -302,6 +302,7 @@ impl<T> Multiplexed<T> {
M: StreamMuxer + Send + Sync + 'static,
M::Substream: Send + Unpin + 'static,
M::OutboundSubstream: Send + 'static,
M::Error: Send + Sync + 'static,
{
boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m))))
}
Expand Down
18 changes: 8 additions & 10 deletions swarm/src/connection.rs
Expand Up @@ -137,40 +137,38 @@ where
) -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>> {
loop {
// Poll the handler for new events.
match self.handler.poll(cx) {
match self.handler.poll(cx)? {
Poll::Pending => {}
Poll::Ready(Ok(handler_wrapper::Event::OutboundSubstreamRequest(user_data))) => {
Poll::Ready(handler_wrapper::Event::OutboundSubstreamRequest(user_data)) => {
self.muxing.open_substream(user_data);
continue;
}
Poll::Ready(Ok(handler_wrapper::Event::Custom(event))) => {
Poll::Ready(handler_wrapper::Event::Custom(event)) => {
return Poll::Ready(Ok(Event::Handler(event)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
}

// Perform I/O on the connection through the muxer, informing the handler
// of new substreams.
match self.muxing.poll(cx) {
match self.muxing.poll(cx)? {
Poll::Pending => {}
Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => {
Poll::Ready(SubstreamEvent::InboundSubstream { substream }) => {
self.handler
.inject_substream(substream, SubstreamEndpoint::Listener);
continue;
}
Poll::Ready(Ok(SubstreamEvent::OutboundSubstream {
Poll::Ready(SubstreamEvent::OutboundSubstream {
user_data,
substream,
})) => {
}) => {
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint);
continue;
}
Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => {
Poll::Ready(SubstreamEvent::AddressChange(address)) => {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
}

return Poll::Pending;
Expand Down
6 changes: 6 additions & 0 deletions swarm/src/connection/error.rs
Expand Up @@ -75,6 +75,12 @@ impl<THandlerErr> From<handler_wrapper::Error<THandlerErr>> for ConnectionError<
}
}

impl<THandlerErr> From<io::Error> for ConnectionError<THandlerErr> {
fn from(error: io::Error) -> Self {
ConnectionError::IO(error)
}
}

/// Errors that can occur in the context of a pending outgoing `Connection`.
///
/// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to
Expand Down
12 changes: 6 additions & 6 deletions swarm/src/connection/substream.rs
Expand Up @@ -23,7 +23,7 @@ use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use smallvec::SmallVec;
use std::sync::Arc;
use std::{fmt, io::Error as IoError, pin::Pin, task::Context, task::Poll};
use std::{fmt, pin::Pin, task::Context, task::Poll};

/// Endpoint for a received substream.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -134,7 +134,7 @@ where
pub fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<SubstreamEvent<TMuxer, TUserData>, IoError>> {
) -> Poll<Result<SubstreamEvent<TMuxer, TUserData>, TMuxer::Error>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in scope for this PR, but while reviewing I was wondering: Do we even need the TMuxer generic on Muxing? I think the actual type is always StreamMuxerBox. Muxing is private the swarm crate and only used by Connection and there we already use the concrete StreamMuxerBox.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to inline the entire Muxing type into Connection once poll_outbound is removed from StreamMuxer. This is already done in #2648 if you want to see what it looks like :)

I am also tempted to inline HandlerWrapper and centralize all the connection handling logic directly in Connection. This would remove quite a bit of indirection because substreams would directly be passed from the StreamMuxer to the ConnectionHandler.

// Polling inbound substream.
match self.inner.poll_event(cx) {
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => {
Expand All @@ -143,7 +143,7 @@ where
Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => {
return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr)))
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => {}
}

Expand All @@ -164,7 +164,7 @@ where
}
Poll::Ready(Err(err)) => {
self.inner.destroy_outbound(outbound);
return Poll::Ready(Err(err.into()));
return Poll::Ready(Err(err));
}
}
}
Expand Down Expand Up @@ -203,13 +203,13 @@ impl<TMuxer> Future for Close<TMuxer>
where
TMuxer: StreamMuxer,
{
type Output = Result<(), IoError>;
type Output = Result<(), TMuxer::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.muxer.poll_close(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
}
}
}
Expand Down