Skip to content

Commit

Permalink
Flatten errors
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Nov 12, 2022
1 parent e9cd5fc commit 5801576
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 13 deletions.
3 changes: 2 additions & 1 deletion protocols/rendezvous/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ impl NetworkBehaviour for Behaviour {
from_fn::OutEvent::OutboundEmitted(Err(e)) => {}
from_fn::OutEvent::FailedToOpen(from_fn::OpenError::Timeout(info)) => {}
from_fn::OutEvent::FailedToOpen(from_fn::OpenError::NegotiationFailed(..)) => {}
from_fn::OutEvent::FailedToOpen(from_fn::OpenError::LimitExceeded(..)) => {}
from_fn::OutEvent::FailedToOpen(from_fn::OpenError::LimitExceeded { .. }) => {}
from_fn::OutEvent::FailedToOpen(from_fn::OpenError::Unsupported { .. }) => {}
}
//
// let new_events = match event {
Expand Down
7 changes: 6 additions & 1 deletion protocols/rendezvous/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,13 @@ impl NetworkBehaviour for Behaviour {
from_fn::OutEvent::OutboundEmitted(_) => {}
from_fn::OutEvent::FailedToOpen(never) => match never {
from_fn::OpenError::Timeout(never) => void::unreachable(never),
from_fn::OpenError::LimitExceeded(never) => void::unreachable(never),
from_fn::OpenError::LimitExceeded {
open_info: never, ..
} => void::unreachable(never),
from_fn::OpenError::NegotiationFailed(never, _) => void::unreachable(never),
from_fn::OpenError::Unsupported {
open_info: never, ..
} => void::unreachable(never),
},
from_fn::OutEvent::InboundEmitted(Err(error)) => {
log::debug!("Inbound stream from {peer_id} failed: {error}");
Expand Down
54 changes: 43 additions & 11 deletions swarm/src/handler/from_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use futures::stream::{BoxStream, SelectAll};
use futures::{Stream, StreamExt};
use libp2p_core::connection::ConnectionId;
use libp2p_core::either::EitherOutput;
use libp2p_core::upgrade::{DeniedUpgrade, EitherUpgrade, NegotiationError, ReadyUpgrade};
use libp2p_core::upgrade::{
DeniedUpgrade, EitherUpgrade, NegotiationError, ProtocolError, ReadyUpgrade,
};
use libp2p_core::{ConnectedPoint, PeerId, UpgradeError};
use std::collections::{HashSet, VecDeque};
use std::error::Error;
Expand Down Expand Up @@ -221,9 +223,19 @@ pub enum OutEvent<I, O, OpenInfo> {

#[derive(Debug)]
pub enum OpenError<OpenInfo> {
/// The time limit for the negotiation handshake was exceeded.
Timeout(OpenInfo),
LimitExceeded(OpenInfo),
NegotiationFailed(OpenInfo, NegotiationError),
/// We have hit the configured limit for the maximum number of pending substreams.
LimitExceeded {
open_info: OpenInfo,
pending_substreams: usize,
},
/// The remote does not support this protocol.
Unsupported {
open_info: OpenInfo,
protocol: &'static str,
},
NegotiationFailed(OpenInfo, ProtocolError),
}

/// A wrapper for state that is shared across all connections.
Expand Down Expand Up @@ -322,8 +334,16 @@ impl<OpenInfo> fmt::Display for OpenError<OpenInfo> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
OpenError::Timeout(_) => write!(f, "opening new substream timed out"),
OpenError::LimitExceeded(_) => write!(f, "limit for pending dials exceeded"),
OpenError::LimitExceeded {
pending_substreams, ..
} => write!(
f,
"limit for pending openings ({pending_substreams}) exceeded"
),
OpenError::NegotiationFailed(_, _) => Ok(()), // Don't print anything to avoid double printing of error.
OpenError::Unsupported { protocol, .. } => {
write!(f, "remote peer does not support {protocol}")
}
}
}
}
Expand All @@ -335,8 +355,9 @@ where
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
OpenError::Timeout(_) => None,
OpenError::LimitExceeded(_) => None,
OpenError::LimitExceeded { .. } => None,
OpenError::NegotiationFailed(_, source) => Some(source),
OpenError::Unsupported { .. } => None,
}
}
}
Expand Down Expand Up @@ -537,8 +558,10 @@ where
InEvent::UpdateState(new_state) => self.state = new_state,
InEvent::NewOutbound(open_info) => {
if self.pending_outbound_streams.len() >= self.pending_outbound_streams_limit {
self.failed_open
.push_back(OpenError::LimitExceeded(open_info));
self.failed_open.push_back(OpenError::LimitExceeded {
open_info,
pending_substreams: self.pending_outbound_streams.len(),
});
} else {
self.pending_outbound_streams.push_back(open_info);
}
Expand All @@ -556,9 +579,17 @@ where
self.failed_open.push_back(OpenError::Timeout(info))
}
ConnectionHandlerUpgrErr::Timer => {}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation)) => self
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(error),
)) => self
.failed_open
.push_back(OpenError::NegotiationFailed(info, negotiation)),
.push_back(OpenError::NegotiationFailed(info, error)),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
self.failed_open.push_back(OpenError::Unsupported {
open_info: info,
protocol: self.protocol,
})
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(apply)) => {
void::unreachable(apply)
}
Expand Down Expand Up @@ -835,8 +866,9 @@ mod tests {
OutEvent::InboundEmitted(_) => {}
OutEvent::OutboundEmitted(_) => {}
OutEvent::FailedToOpen(OpenError::Timeout(_)) => {}
OutEvent::FailedToOpen(OpenError::NegotiationFailed(_, _neg_error)) => {}
OutEvent::FailedToOpen(OpenError::LimitExceeded(_)) => {}
OutEvent::FailedToOpen(OpenError::NegotiationFailed(_, _)) => {}
OutEvent::FailedToOpen(OpenError::LimitExceeded { .. }) => {}
OutEvent::FailedToOpen(OpenError::Unsupported { .. }) => {}
}
}

Expand Down

0 comments on commit 5801576

Please sign in to comment.