From b70272d4ea4191aa0fc1f999afd09efab6ef3b00 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sat, 12 Nov 2022 12:34:39 +1100 Subject: [PATCH] Implement `libp2p-ping` with `from_fn` abstraction --- protocols/ping/src/handler.rs | 400 ---------------------------------- protocols/ping/src/lib.rs | 335 ++++++++++++++++++++++++---- 2 files changed, 289 insertions(+), 446 deletions(-) delete mode 100644 protocols/ping/src/handler.rs diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs deleted file mode 100644 index af1ef898981e..000000000000 --- a/protocols/ping/src/handler.rs +++ /dev/null @@ -1,400 +0,0 @@ -// Copyright 2019 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::{protocol, PROTOCOL_NAME}; -use futures::future::BoxFuture; -use futures::prelude::*; -use futures_timer::Delay; -use libp2p_core::upgrade::ReadyUpgrade; -use libp2p_core::{upgrade::NegotiationError, UpgradeError}; -use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - NegotiatedSubstream, SubstreamProtocol, -}; -use std::collections::VecDeque; -use std::{ - error::Error, - fmt, io, - num::NonZeroU32, - task::{Context, Poll}, - time::Duration, -}; -use void::Void; - -/// The configuration for outbound pings. -#[derive(Debug, Clone)] -pub struct Config { - /// The timeout of an outbound ping. - timeout: Duration, - /// The duration between the last successful outbound or inbound ping - /// and the next outbound ping. - interval: Duration, - /// The maximum number of failed outbound pings before the associated - /// connection is deemed unhealthy, indicating to the `Swarm` that it - /// should be closed. - max_failures: NonZeroU32, - /// Whether the connection should generally be kept alive unless - /// `max_failures` occur. - keep_alive: bool, -} - -impl Config { - /// Creates a new [`Config`] with the following default settings: - /// - /// * [`Config::with_interval`] 15s - /// * [`Config::with_timeout`] 20s - /// * [`Config::with_max_failures`] 1 - /// * [`Config::with_keep_alive`] false - /// - /// These settings have the following effect: - /// - /// * A ping is sent every 15 seconds on a healthy connection. - /// * Every ping sent must yield a response within 20 seconds in order to - /// be successful. - /// * A single ping failure is sufficient for the connection to be subject - /// to being closed. - /// * The connection may be closed at any time as far as the ping protocol - /// is concerned, i.e. the ping protocol itself does not keep the - /// connection alive. - pub fn new() -> Self { - Self { - timeout: Duration::from_secs(20), - interval: Duration::from_secs(15), - max_failures: NonZeroU32::new(1).expect("1 != 0"), - keep_alive: false, - } - } - - /// Sets the ping timeout. - pub fn with_timeout(mut self, d: Duration) -> Self { - self.timeout = d; - self - } - - /// Sets the ping interval. - pub fn with_interval(mut self, d: Duration) -> Self { - self.interval = d; - self - } - - /// Sets the maximum number of consecutive ping failures upon which the remote - /// peer is considered unreachable and the connection closed. - pub fn with_max_failures(mut self, n: NonZeroU32) -> Self { - self.max_failures = n; - self - } - - /// Sets whether the ping protocol itself should keep the connection alive, - /// apart from the maximum allowed failures. - /// - /// By default, the ping protocol itself allows the connection to be closed - /// at any time, i.e. in the absence of ping failures the connection lifetime - /// is determined by other protocol handlers. - /// - /// If the maximum number of allowed ping failures is reached, the - /// connection is always terminated as a result of [`ConnectionHandler::poll`] - /// returning an error, regardless of the keep-alive setting. - #[deprecated( - since = "0.40.0", - note = "Use `libp2p::swarm::behaviour::KeepAlive` if you need to keep connections alive unconditionally." - )] - pub fn with_keep_alive(mut self, b: bool) -> Self { - self.keep_alive = b; - self - } -} - -impl Default for Config { - fn default() -> Self { - Self::new() - } -} - -/// The successful result of processing an inbound or outbound ping. -#[derive(Debug)] -pub enum Success { - /// Received a ping and sent back a pong. - Pong, - /// Sent a ping and received back a pong. - /// - /// Includes the round-trip time. - Ping { rtt: Duration }, -} - -/// An outbound ping failure. -#[derive(Debug)] -pub enum Failure { - /// The ping timed out, i.e. no response was received within the - /// configured ping timeout. - Timeout, - /// The peer does not support the ping protocol. - Unsupported, - /// The ping failed for reasons other than a timeout. - Other { - error: Box, - }, -} - -impl fmt::Display for Failure { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Failure::Timeout => f.write_str("Ping timeout"), - Failure::Other { error } => write!(f, "Ping error: {}", error), - Failure::Unsupported => write!(f, "Ping protocol not supported"), - } - } -} - -impl Error for Failure { - fn source(&self) -> Option<&(dyn Error + 'static)> { - match self { - Failure::Timeout => None, - Failure::Other { error } => Some(&**error), - Failure::Unsupported => None, - } - } -} - -/// Protocol handler that handles pinging the remote at a regular period -/// and answering ping queries. -/// -/// If the remote doesn't respond, produces an error that closes the connection. -pub struct Handler { - /// Configuration options. - config: Config, - /// The timer used for the delay to the next ping as well as - /// the ping timeout. - timer: Delay, - /// Outbound ping failures that are pending to be processed by `poll()`. - pending_errors: VecDeque, - /// The number of consecutive ping failures that occurred. - /// - /// Each successful ping resets this counter to 0. - failures: u32, - /// The outbound ping state. - outbound: Option, - /// The inbound pong handler, i.e. if there is an inbound - /// substream, this is always a future that waits for the - /// next inbound ping to be answered. - inbound: Option, - /// Tracks the state of our handler. - state: State, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum State { - /// We are inactive because the other peer doesn't support ping. - Inactive { - /// Whether or not we've reported the missing support yet. - /// - /// This is used to avoid repeated events being emitted for a specific connection. - reported: bool, - }, - /// We are actively pinging the other peer. - Active, -} - -impl Handler { - /// Builds a new [`Handler`] with the given configuration. - pub fn new(config: Config) -> Self { - Handler { - config, - timer: Delay::new(Duration::new(0, 0)), - pending_errors: VecDeque::with_capacity(2), - failures: 0, - outbound: None, - inbound: None, - state: State::Active, - } - } -} - -impl ConnectionHandler for Handler { - type InEvent = Void; - type OutEvent = crate::Result; - type Error = Failure; - type InboundProtocol = ReadyUpgrade<&'static [u8]>; - type OutboundProtocol = ReadyUpgrade<&'static [u8]>; - type OutboundOpenInfo = (); - type InboundOpenInfo = (); - - fn listen_protocol(&self) -> SubstreamProtocol, ()> { - SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) - } - - fn inject_fully_negotiated_inbound(&mut self, stream: NegotiatedSubstream, (): ()) { - self.inbound = Some(protocol::recv_ping(stream).boxed()); - } - - fn inject_fully_negotiated_outbound(&mut self, stream: NegotiatedSubstream, (): ()) { - self.timer.reset(self.config.timeout); - self.outbound = Some(OutboundState::Ping(protocol::send_ping(stream).boxed())); - } - - fn inject_event(&mut self, _: Void) {} - - fn inject_dial_upgrade_error(&mut self, _info: (), error: ConnectionHandlerUpgrErr) { - self.outbound = None; // Request a new substream on the next `poll`. - - let error = match error { - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { - debug_assert_eq!(self.state, State::Active); - - self.state = State::Inactive { reported: false }; - return; - } - // Note: This timeout only covers protocol negotiation. - ConnectionHandlerUpgrErr::Timeout => Failure::Timeout, - e => Failure::Other { error: Box::new(e) }, - }; - - self.pending_errors.push_front(error); - } - - fn connection_keep_alive(&self) -> KeepAlive { - if self.config.keep_alive { - KeepAlive::Yes - } else { - KeepAlive::No - } - } - - fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll, (), crate::Result, Self::Error>> - { - match self.state { - State::Inactive { reported: true } => { - return Poll::Pending; // nothing to do on this connection - } - State::Inactive { reported: false } => { - self.state = State::Inactive { reported: true }; - return Poll::Ready(ConnectionHandlerEvent::Custom(Err(Failure::Unsupported))); - } - State::Active => {} - } - - // Respond to inbound pings. - if let Some(fut) = self.inbound.as_mut() { - match fut.poll_unpin(cx) { - Poll::Pending => {} - Poll::Ready(Err(e)) => { - log::debug!("Inbound ping error: {:?}", e); - self.inbound = None; - } - Poll::Ready(Ok(stream)) => { - // A ping from a remote peer has been answered, wait for the next. - self.inbound = Some(protocol::recv_ping(stream).boxed()); - return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(Success::Pong))); - } - } - } - - loop { - // Check for outbound ping failures. - if let Some(error) = self.pending_errors.pop_back() { - log::debug!("Ping failure: {:?}", error); - - self.failures += 1; - - // Note: For backward-compatibility, with configured - // `max_failures == 1`, the first failure is always "free" - // and silent. This allows peers who still use a new substream - // for each ping to have successful ping exchanges with peers - // that use a single substream, since every successful ping - // resets `failures` to `0`, while at the same time emitting - // events only for `max_failures - 1` failures, as before. - if self.failures > 1 || self.config.max_failures.get() > 1 { - if self.failures >= self.config.max_failures.get() { - log::debug!("Too many failures ({}). Closing connection.", self.failures); - return Poll::Ready(ConnectionHandlerEvent::Close(error)); - } - - return Poll::Ready(ConnectionHandlerEvent::Custom(Err(error))); - } - } - - // Continue outbound pings. - match self.outbound.take() { - Some(OutboundState::Ping(mut ping)) => match ping.poll_unpin(cx) { - Poll::Pending => { - if self.timer.poll_unpin(cx).is_ready() { - self.pending_errors.push_front(Failure::Timeout); - } else { - self.outbound = Some(OutboundState::Ping(ping)); - break; - } - } - Poll::Ready(Ok((stream, rtt))) => { - self.failures = 0; - self.timer.reset(self.config.interval); - self.outbound = Some(OutboundState::Idle(stream)); - return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(Success::Ping { - rtt, - }))); - } - Poll::Ready(Err(e)) => { - self.pending_errors - .push_front(Failure::Other { error: Box::new(e) }); - } - }, - Some(OutboundState::Idle(stream)) => match self.timer.poll_unpin(cx) { - Poll::Pending => { - self.outbound = Some(OutboundState::Idle(stream)); - break; - } - Poll::Ready(()) => { - self.timer.reset(self.config.timeout); - self.outbound = - Some(OutboundState::Ping(protocol::send_ping(stream).boxed())); - } - }, - Some(OutboundState::OpenStream) => { - self.outbound = Some(OutboundState::OpenStream); - break; - } - None => { - self.outbound = Some(OutboundState::OpenStream); - let protocol = SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) - .with_timeout(self.config.timeout); - return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol, - }); - } - } - } - - Poll::Pending - } -} - -type PingFuture = BoxFuture<'static, Result<(NegotiatedSubstream, Duration), io::Error>>; -type PongFuture = BoxFuture<'static, Result>; - -/// The current state w.r.t. outbound pings. -enum OutboundState { - /// A new substream is being negotiated for the ping protocol. - OpenStream, - /// The substream is idle, waiting to send the next ping. - Idle(NegotiatedSubstream), - /// A ping is being sent and the response awaited. - Ping(PingFuture), -} diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index 02babd2ea06a..763e09097435 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -42,50 +42,40 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -mod handler; mod protocol; -use handler::Handler; -pub use handler::{Config, Failure, Success}; -use libp2p_core::{connection::ConnectionId, PeerId}; -use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use std::{ - collections::VecDeque, - task::{Context, Poll}, +use crate::protocol::{recv_ping, send_ping}; +use futures::future::Either; +use futures_timer::Delay; +use libp2p_core::connection::ConnectionId; +use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; +use libp2p_swarm::handler::from_fn; +use libp2p_swarm::{ + CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; +use std::collections::{HashMap, VecDeque}; +use std::error::Error; +use std::num::NonZeroU32; +use std::task::{Context, Poll}; +use std::time::Duration; +use std::{fmt, io}; -#[deprecated(since = "0.39.1", note = "Use libp2p::ping::Config instead.")] -pub type PingConfig = Config; +pub use crate::protocol::PROTOCOL_NAME; -#[deprecated(since = "0.39.1", note = "Use libp2p::ping::Event instead.")] -pub type PingEvent = Event; - -#[deprecated(since = "0.39.1", note = "Use libp2p::ping::Success instead.")] -pub type PingSuccess = Success; - -#[deprecated(since = "0.39.1", note = "Use libp2p::ping::Failure instead.")] -pub type PingFailure = Failure; - -#[deprecated(since = "0.39.1", note = "Use libp2p::ping::Result instead.")] -pub type PingResult = Result; - -#[deprecated(since = "0.39.1", note = "Use libp2p::ping::Behaviour instead.")] -pub type Ping = Behaviour; - -pub use self::protocol::PROTOCOL_NAME; - -/// The result of an inbound or outbound ping. pub type Result = std::result::Result; +type Handler = from_fn::FromFnProto; + /// A [`NetworkBehaviour`] that responds to inbound pings and /// periodically sends outbound pings on every established connection. /// /// See the crate root documentation for more information. +#[derive(Default)] pub struct Behaviour { /// Configuration for outbound pings. config: Config, - /// Queue of events to yield to the swarm. - events: VecDeque, + actions: VecDeque>, + failures: HashMap<(PeerId, ConnectionId), (u32, VecDeque)>, } /// Event generated by the `Ping` network behaviour. @@ -102,14 +92,138 @@ impl Behaviour { pub fn new(config: Config) -> Self { Self { config, - events: VecDeque::new(), + actions: Default::default(), + failures: Default::default(), + } + } + + fn reset_num_failures(&mut self, peer: PeerId, connection_id: ConnectionId) { + self.failures.entry((peer, connection_id)).or_default().0 = 0; + } + + fn record_failure(&mut self, peer: PeerId, connection_id: ConnectionId, e: Failure) { + self.failures + .entry((peer, connection_id)) + .or_default() + .1 + .push_back(e); + } +} + +/// The configuration for outbound pings. +#[derive(Debug, Clone)] +pub struct Config { + /// The timeout of an outbound ping. + pub(crate) timeout: Duration, + /// The duration between the last successful outbound or inbound ping + /// and the next outbound ping. + pub(crate) interval: Duration, + /// The maximum number of failed outbound pings before the associated + /// connection is deemed unhealthy, indicating to the `Swarm` that it + /// should be closed. + pub(crate) max_failures: NonZeroU32, +} + +impl Config { + /// Creates a new [`Config`] with the following default settings: + /// + /// * [`Config::with_interval`] 15s + /// * [`Config::with_timeout`] 20s + /// * [`Config::with_max_failures`] 1 + /// * [`Config::with_keep_alive`] false + /// + /// These settings have the following effect: + /// + /// * A ping is sent every 15 seconds on a healthy connection. + /// * Every ping sent must yield a response within 20 seconds in order to + /// be successful. + /// * A single ping failure is sufficient for the connection to be subject + /// to being closed. + /// * The connection may be closed at any time as far as the ping protocol + /// is concerned, i.e. the ping protocol itself does not keep the + /// connection alive. + pub fn new() -> Self { + Self { + timeout: Duration::from_secs(20), + interval: Duration::from_secs(15), + max_failures: NonZeroU32::new(1).expect("1 != 0"), } } + + /// Sets the ping timeout. + pub fn with_timeout(mut self, d: Duration) -> Self { + self.timeout = d; + self + } + + /// Sets the ping interval. + pub fn with_interval(mut self, d: Duration) -> Self { + self.interval = d; + self + } + + /// Sets the maximum number of consecutive ping failures upon which the remote + /// peer is considered unreachable and the connection closed. + pub fn with_max_failures(mut self, n: NonZeroU32) -> Self { + self.max_failures = n; + self + } } -impl Default for Behaviour { +impl Default for Config { fn default() -> Self { - Self::new(Config::new()) + Self::new() + } +} + +/// The successful result of processing an inbound or outbound ping. +#[derive(Debug)] +pub enum Success { + /// Received a ping and sent back a pong. + Pong, + /// Sent a ping and received back a pong. + /// + /// Includes the round-trip time. + Ping { rtt: Duration }, +} + +/// An outbound ping failure. +#[derive(Debug)] +pub enum Failure { + /// The ping timed out, i.e. no response was received within the + /// configured ping timeout. + Timeout, + /// The peer does not support the ping protocol. + Unsupported, + /// The ping failed for reasons other than a timeout. + Other { + error: Box, + }, +} + +impl From for Failure { + fn from(e: io::Error) -> Self { + Failure::Other { error: Box::new(e) } + } +} + +impl fmt::Display for Failure { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Failure::Timeout => f.write_str("Ping timeout"), + Failure::Other { error } => write!(f, "Ping error: {}", error), + Failure::Unsupported => write!(f, "Ping protocol not supported"), + } + } +} + +impl Error for Failure { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + Failure::Timeout => None, + Failure::Other { error } => Some(&**error), + Failure::Unsupported => None, + } } } @@ -118,11 +232,104 @@ impl NetworkBehaviour for Behaviour { type OutEvent = Event; fn new_handler(&mut self) -> Self::ConnectionHandler { - Handler::new(self.config.clone()) + from_fn::from_fn(std::str::from_utf8(PROTOCOL_NAME).unwrap()) + .without_state() + .with_streaming_inbound_handler(1, |stream, _, _, _| { + futures::stream::try_unfold(stream, |stream| async move { + let stream = recv_ping(stream).await?; + + Ok(Some((Success::Pong, stream))) + }) + }) + .with_streaming_outbound_handler(1, { + let interval = self.config.interval; + let timeout = self.config.timeout; + + move |stream, _, _, _, _| { + futures::stream::try_unfold(stream, move |stream| async move { + Delay::new(interval).await; + + let ping = send_ping(stream); + futures::pin_mut!(ping); + + match futures::future::select(Delay::new(timeout), ping).await { + Either::Left(((), _unfinished_ping)) => Err(Failure::Timeout), + Either::Right((Ok((stream, rtt)), _)) => { + Ok(Some((Success::Ping { rtt }, stream))) + } + Either::Right((Err(e), _)) => { + Err(Failure::Other { error: Box::new(e) }) + } + } + }) + } + }) + } + + fn inject_connection_established( + &mut self, + peer: &PeerId, + connection: &ConnectionId, + _: &ConnectedPoint, + _: Option<&Vec>, + _: usize, + ) { + self.actions.push_back(start_ping_action(peer, connection)); } - fn inject_event(&mut self, peer: PeerId, _: ConnectionId, result: Result) { - self.events.push_front(Event { peer, result }) + fn inject_event( + &mut self, + peer: PeerId, + connection: ConnectionId, + event: from_fn::OutEvent, + ) { + match event { + from_fn::OutEvent::InboundEmitted(Ok(success)) => { + self.actions + .push_back(NetworkBehaviourAction::GenerateEvent(Event { + peer, + result: Ok(success), + })) + } + from_fn::OutEvent::OutboundEmitted(Ok(success)) => { + self.actions + .push_back(NetworkBehaviourAction::GenerateEvent(Event { + peer, + result: Ok(success), + })); + self.reset_num_failures(peer, connection); + } + from_fn::OutEvent::InboundEmitted(Err(e)) => { + log::debug!("Inbound ping error: {:?}", e); + } + from_fn::OutEvent::OutboundEmitted(Err(e)) => { + self.record_failure(peer, connection, e); + } + from_fn::OutEvent::FailedToOpen(from_fn::OpenError::Timeout(())) => { + self.record_failure(peer, connection, Failure::Timeout); + } + from_fn::OutEvent::FailedToOpen(from_fn::OpenError::Unsupported { + open_info: (), + .. + }) => { + self.record_failure(peer, connection, Failure::Unsupported); + } + from_fn::OutEvent::FailedToOpen(from_fn::OpenError::NegotiationFailed((), error)) => { + self.record_failure( + peer, + connection, + Failure::Other { + error: Box::new(error), + }, + ); + } + from_fn::OutEvent::FailedToOpen(from_fn::OpenError::LimitExceeded { + open_info: (), + .. + }) => { + unreachable!("We only ever open a new stream if the old one is dead.") + } + } } fn poll( @@ -130,18 +337,54 @@ impl NetworkBehaviour for Behaviour { _: &mut Context<'_>, _: &mut impl PollParameters, ) -> Poll> { - if let Some(e) = self.events.pop_back() { - let Event { result, peer } = &e; + if let Some(action) = self.actions.pop_front() { + return Poll::Ready(action); + } - match result { - Ok(Success::Ping { .. }) => log::debug!("Ping sent to {:?}", peer), - Ok(Success::Pong) => log::debug!("Ping received from {:?}", peer), - _ => {} - } + for ((peer, connection), (failures, pending_errors)) in self.failures.iter_mut() { + // Check for outbound ping failures. + if let Some(error) = pending_errors.pop_back() { + log::debug!("Ping failure: {:?}", error); - Poll::Ready(NetworkBehaviourAction::GenerateEvent(e)) - } else { - Poll::Pending + *failures += 1; + + // Note: For backward-compatibility, with configured + // `max_failures == 1`, the first failure is always "free" + // and silent. This allows peers who still use a new substream + // for each ping to have successful ping exchanges with peers + // that use a single substream, since every successful ping + // resets `failures` to `0`, while at the same time emitting + // events only for `max_failures - 1` failures, as before. + if *failures > 1 || self.config.max_failures.get() > 1 { + if *failures >= self.config.max_failures.get() { + log::debug!("Too many failures ({}). Closing connection.", failures); + return Poll::Ready(NetworkBehaviourAction::CloseConnection { + peer_id: *peer, + connection: CloseConnection::One(*connection), + }); + } + } + + self.actions.push_back(start_ping_action(peer, connection)); + + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(Event { + peer: *peer, + result: Err(error), + })); + } } + + Poll::Pending + } +} + +fn start_ping_action( + peer: &PeerId, + connection: &ConnectionId, +) -> NetworkBehaviourAction { + NetworkBehaviourAction::NotifyHandler { + peer_id: *peer, + handler: NotifyHandler::One(*connection), + event: from_fn::InEvent::NewOutbound(()), } }