Skip to content

Commit

Permalink
core/muxing: Replace Into<io::Error> bound on StreamMuxer with `s…
Browse files Browse the repository at this point in the history
…td::error::Error` (#2710)

* core/muxing: Remove `Into<io::Error>` bound from `StreamMuxer::Error`

This allows us to preserve the type information of a muxer's concrete
error as long as possible. For `StreamMuxerBox`, we leverage `io::Error`'s
capability of wrapping any error that implements `Into<Box<dyn Error>>`.

* Use `?` in `Connection::poll`

* Use `?` in `muxing::boxed::Wrap`

* Use `futures::ready!` in `muxing::boxed::Wrap`

* Fill PR number into changelog

* Put `Error + Send + Sync` bounds directly on `StreamMuxer::Error`

* Move `Send + Sync` bounds to higher layers

* Use `map_inbound_stream` helper

* Update changelog to match new implementation
  • Loading branch information
thomaseizinger committed Jun 24, 2022
1 parent eb490c0 commit 0f40e51
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 45 deletions.
2 changes: 2 additions & 0 deletions core/CHANGELOG.md
Expand Up @@ -3,9 +3,11 @@
- Introduce `StreamMuxerEvent::map_inbound_stream`. See [PR 2691].
- Remove `{read,write,flush,shutdown,destroy}_substream` functions from `StreamMuxer` trait
in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707].
- Replace `Into<std::io::Error>` bound on `StreamMuxer::Error` with `std::error::Error`. See [PR 2710].

[PR 2691]: https://github.com/libp2p/rust-libp2p/pull/2691
[PR 2707]: https://github.com/libp2p/rust-libp2p/pull/2707
[PR 2710]: https://github.com/libp2p/rust-libp2p/pull/2710

# 0.33.0

Expand Down
14 changes: 7 additions & 7 deletions core/src/either.rs
Expand Up @@ -203,7 +203,7 @@ where
{
type Substream = EitherOutput<A::Substream, B::Substream>;
type OutboundSubstream = EitherOutbound<A, B>;
type Error = io::Error;
type Error = EitherError<A::Error, B::Error>;

fn poll_event(
&self,
Expand All @@ -212,11 +212,11 @@ where
match self {
EitherOutput::First(inner) => inner
.poll_event(cx)
.map_err(|e| e.into())
.map_err(EitherError::A)
.map_ok(|event| event.map_inbound_stream(EitherOutput::First)),
EitherOutput::Second(inner) => inner
.poll_event(cx)
.map_err(|e| e.into())
.map_err(EitherError::B)
.map_ok(|event| event.map_inbound_stream(EitherOutput::Second)),
}
}
Expand All @@ -237,11 +237,11 @@ where
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::First))
.map_err(|e| e.into()),
.map_err(EitherError::A),
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::Second))
.map_err(|e| e.into()),
.map_err(EitherError::B),
_ => panic!("Wrong API usage"),
}
}
Expand All @@ -261,8 +261,8 @@ where

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self {
EitherOutput::First(inner) => inner.poll_close(cx).map_err(|e| e.into()),
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(|e| e.into()),
EitherOutput::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions core/src/muxing.rs
Expand Up @@ -52,7 +52,6 @@

use futures::{task::Context, task::Poll, AsyncRead, AsyncWrite};
use multiaddr::Multiaddr;
use std::io;

pub use self::boxed::StreamMuxerBox;
pub use self::boxed::SubstreamBox;
Expand All @@ -76,7 +75,7 @@ pub trait StreamMuxer {
type OutboundSubstream;

/// Error type of the muxer
type Error: Into<io::Error>;
type Error: std::error::Error;

/// Polls for a connection-wide event.
///
Expand Down
38 changes: 18 additions & 20 deletions core/src/muxing/boxed.rs
@@ -1,8 +1,9 @@
use crate::muxing::StreamMuxerEvent;
use crate::StreamMuxer;
use fnv::FnvHashMap;
use futures::{AsyncRead, AsyncWrite};
use futures::{ready, AsyncRead, AsyncWrite};
use parking_lot::Mutex;
use std::error::Error;
use std::fmt;
use std::io;
use std::io::{IoSlice, IoSliceMut};
Expand Down Expand Up @@ -38,6 +39,7 @@ impl<T> StreamMuxer for Wrap<T>
where
T: StreamMuxer,
T::Substream: Send + Unpin + 'static,
T::Error: Send + Sync + 'static,
{
type Substream = SubstreamBox;
type OutboundSubstream = usize; // TODO: use a newtype
Expand All @@ -48,18 +50,10 @@ where
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
let substream = match self.inner.poll_event(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) => {
return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a)))
}
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
};
let event = ready!(self.inner.poll_event(cx).map_err(into_io_error)?)
.map_inbound_stream(SubstreamBox::new);

Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(SubstreamBox::new(
substream,
))))
Poll::Ready(Ok(event))
}

#[inline]
Expand All @@ -77,16 +71,12 @@ where
substream: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
let mut list = self.outbound.lock();
let substream = match self
let stream = ready!(self
.inner
.poll_outbound(cx, list.get_mut(substream).unwrap())
{
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(s)) => s,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
};
.map_err(into_io_error)?);

Poll::Ready(Ok(SubstreamBox::new(substream)))
Poll::Ready(Ok(SubstreamBox::new(stream)))
}

#[inline]
Expand All @@ -98,17 +88,25 @@ where

#[inline]
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close(cx).map_err(|e| e.into())
self.inner.poll_close(cx).map_err(into_io_error)
}
}

fn into_io_error<E>(err: E) -> io::Error
where
E: Error + Send + Sync + 'static,
{
io::Error::new(io::ErrorKind::Other, err)
}

impl StreamMuxerBox {
/// Turns a stream muxer into a `StreamMuxerBox`.
pub fn new<T>(muxer: T) -> StreamMuxerBox
where
T: StreamMuxer + Send + Sync + 'static,
T::OutboundSubstream: Send,
T::Substream: Send + Unpin + 'static,
T::Error: Send + Sync + 'static,
{
let wrap = Wrap {
inner: muxer,
Expand Down
1 change: 1 addition & 0 deletions core/src/transport/upgrade.rs
Expand Up @@ -302,6 +302,7 @@ impl<T> Multiplexed<T> {
M: StreamMuxer + Send + Sync + 'static,
M::Substream: Send + Unpin + 'static,
M::OutboundSubstream: Send + 'static,
M::Error: Send + Sync + 'static,
{
boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m))))
}
Expand Down
18 changes: 8 additions & 10 deletions swarm/src/connection.rs
Expand Up @@ -137,40 +137,38 @@ where
) -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>> {
loop {
// Poll the handler for new events.
match self.handler.poll(cx) {
match self.handler.poll(cx)? {
Poll::Pending => {}
Poll::Ready(Ok(handler_wrapper::Event::OutboundSubstreamRequest(user_data))) => {
Poll::Ready(handler_wrapper::Event::OutboundSubstreamRequest(user_data)) => {
self.muxing.open_substream(user_data);
continue;
}
Poll::Ready(Ok(handler_wrapper::Event::Custom(event))) => {
Poll::Ready(handler_wrapper::Event::Custom(event)) => {
return Poll::Ready(Ok(Event::Handler(event)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
}

// Perform I/O on the connection through the muxer, informing the handler
// of new substreams.
match self.muxing.poll(cx) {
match self.muxing.poll(cx)? {
Poll::Pending => {}
Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => {
Poll::Ready(SubstreamEvent::InboundSubstream { substream }) => {
self.handler
.inject_substream(substream, SubstreamEndpoint::Listener);
continue;
}
Poll::Ready(Ok(SubstreamEvent::OutboundSubstream {
Poll::Ready(SubstreamEvent::OutboundSubstream {
user_data,
substream,
})) => {
}) => {
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint);
continue;
}
Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => {
Poll::Ready(SubstreamEvent::AddressChange(address)) => {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
}

return Poll::Pending;
Expand Down
6 changes: 6 additions & 0 deletions swarm/src/connection/error.rs
Expand Up @@ -75,6 +75,12 @@ impl<THandlerErr> From<handler_wrapper::Error<THandlerErr>> for ConnectionError<
}
}

impl<THandlerErr> From<io::Error> for ConnectionError<THandlerErr> {
fn from(error: io::Error) -> Self {
ConnectionError::IO(error)
}
}

/// Errors that can occur in the context of a pending outgoing `Connection`.
///
/// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to
Expand Down
12 changes: 6 additions & 6 deletions swarm/src/connection/substream.rs
Expand Up @@ -23,7 +23,7 @@ use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use smallvec::SmallVec;
use std::sync::Arc;
use std::{fmt, io::Error as IoError, pin::Pin, task::Context, task::Poll};
use std::{fmt, pin::Pin, task::Context, task::Poll};

/// Endpoint for a received substream.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -134,7 +134,7 @@ where
pub fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<SubstreamEvent<TMuxer, TUserData>, IoError>> {
) -> Poll<Result<SubstreamEvent<TMuxer, TUserData>, TMuxer::Error>> {
// Polling inbound substream.
match self.inner.poll_event(cx) {
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => {
Expand All @@ -143,7 +143,7 @@ where
Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => {
return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr)))
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => {}
}

Expand All @@ -164,7 +164,7 @@ where
}
Poll::Ready(Err(err)) => {
self.inner.destroy_outbound(outbound);
return Poll::Ready(Err(err.into()));
return Poll::Ready(Err(err));
}
}
}
Expand Down Expand Up @@ -203,13 +203,13 @@ impl<TMuxer> Future for Close<TMuxer>
where
TMuxer: StreamMuxer,
{
type Output = Result<(), IoError>;
type Output = Result<(), TMuxer::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.muxer.poll_close(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
}
}
}
Expand Down

0 comments on commit 0f40e51

Please sign in to comment.