From 5cb4886ab264ef0e70245c666d4fdc9afe059efa Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 9 Jun 2022 15:12:03 +0200 Subject: [PATCH] protocols/kad: Limit # of inbound substreams to 32 (#2699) * protocols/kad/: Split into outbound and inbound substreams * protocols/kad: Limit # of inbound substreams to 32 A remote node may still send more than 32 requests in parallel by using more than one connection or by sending more than one request per stream. * protocols/kad: Favor new substreams over old ones waiting for reuse When a new inbound substream comes in and the limit of total inbound substreams is hit, try to find an old inbound substream waiting to be reused. In such case, replace the old with the new. In case no such old substream exists, drop the new one. --- protocols/kad/CHANGELOG.md | 6 + protocols/kad/Cargo.toml | 2 +- protocols/kad/src/handler.rs | 497 +++++++++++++++++++++++------------ 3 files changed, 334 insertions(+), 171 deletions(-) diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 7140fd1238f..77e1cd31be3 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.37.1 - unreleased + +- Limit # of inbound streams to 32. [See PR 2699]. + +[PR 2699]: https://github.com/libp2p/rust-libp2p/pull/2699 + # 0.37.0 - Update to `libp2p-core` `v0.33.0`. diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index a7410670e62..7480632b4fe 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-kad" edition = "2021" rust-version = "1.56.1" description = "Kademlia protocol for libp2p" -version = "0.37.0" +version = "0.37.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 30d22c9067d..bcadb57f44c 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -39,6 +39,8 @@ use std::{ error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration, }; +const MAX_NUM_INBOUND_SUBSTREAMS: usize = 32; + /// A prototype from which [`KademliaHandler`]s can be constructed. pub struct KademliaHandlerProto { config: KademliaHandlerConfig, @@ -57,8 +59,8 @@ impl KademliaHandlerProto { impl IntoConnectionHandler for KademliaHandlerProto { type Handler = KademliaHandler; - fn into_handler(self, _: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { - KademliaHandler::new(self.config, endpoint.clone()) + fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { + KademliaHandler::new(self.config, endpoint.clone(), *remote_peer_id) } fn inbound_protocol(&self) -> ::InboundProtocol { @@ -84,8 +86,11 @@ pub struct KademliaHandler { /// Next unique ID of a connection. next_connec_unique_id: UniqueConnecId, - /// List of active substreams with the state they are in. - substreams: Vec>, + /// List of active outbound substreams with the state they are in. + outbound_substreams: Vec>, + + /// List of active inbound substreams with the state they are in. + inbound_substreams: Vec, /// Until when to keep the connection alive. keep_alive: KeepAlive, @@ -94,6 +99,9 @@ pub struct KademliaHandler { /// is associated with. endpoint: ConnectedPoint, + /// The [`PeerId`] of the remote. + remote_peer_id: PeerId, + /// The current state of protocol confirmation. protocol_status: ProtocolStatus, } @@ -125,65 +133,86 @@ pub struct KademliaHandlerConfig { pub idle_timeout: Duration, } -/// State of an active substream, opened either by us or by the remote. -enum SubstreamState { +/// State of an active outbound substream. +enum OutboundSubstreamState { /// We haven't started opening the outgoing substream yet. /// Contains the request we want to send, and the user data if we expect an answer. - OutPendingOpen(KadRequestMsg, Option), + PendingOpen(KadRequestMsg, Option), /// Waiting to send a message to the remote. - OutPendingSend( + PendingSend( KadOutStreamSink, KadRequestMsg, Option, ), /// Waiting to flush the substream so that the data arrives to the remote. - OutPendingFlush(KadOutStreamSink, Option), + PendingFlush(KadOutStreamSink, Option), /// Waiting for an answer back from the remote. // TODO: add timeout - OutWaitingAnswer(KadOutStreamSink, TUserData), + WaitingAnswer(KadOutStreamSink, TUserData), /// An error happened on the substream and we should report the error to the user. - OutReportError(KademliaHandlerQueryErr, TUserData), + ReportError(KademliaHandlerQueryErr, TUserData), /// The substream is being closed. - OutClosing(KadOutStreamSink), + Closing(KadOutStreamSink), +} + +/// State of an active inbound substream. +enum InboundSubstreamState { /// Waiting for a request from the remote. - InWaitingMessage(UniqueConnecId, KadInStreamSink), - /// Waiting for the user to send a `KademliaHandlerIn` event containing the response. - InWaitingUser(UniqueConnecId, KadInStreamSink), + WaitingMessage { + /// Whether it is the first message to be awaited on this stream. + first: bool, + connection_id: UniqueConnecId, + substream: KadInStreamSink, + }, + /// Waiting for the user to send a [`KademliaHandlerIn`] event containing the response. + WaitingUser(UniqueConnecId, KadInStreamSink), /// Waiting to send an answer back to the remote. - InPendingSend( + PendingSend( UniqueConnecId, KadInStreamSink, KadResponseMsg, ), /// Waiting to flush an answer back to the remote. - InPendingFlush(UniqueConnecId, KadInStreamSink), + PendingFlush(UniqueConnecId, KadInStreamSink), /// The substream is being closed. - InClosing(KadInStreamSink), + Closing(KadInStreamSink), } -impl SubstreamState { +impl OutboundSubstreamState { /// Tries to close the substream. /// /// If the substream is not ready to be closed, returns it back. fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { match self { - SubstreamState::OutPendingOpen(_, _) | SubstreamState::OutReportError(_, _) => { - Poll::Ready(()) - } - SubstreamState::OutPendingSend(ref mut stream, _, _) - | SubstreamState::OutPendingFlush(ref mut stream, _) - | SubstreamState::OutWaitingAnswer(ref mut stream, _) - | SubstreamState::OutClosing(ref mut stream) => { + OutboundSubstreamState::PendingOpen(_, _) + | OutboundSubstreamState::ReportError(_, _) => Poll::Ready(()), + OutboundSubstreamState::PendingSend(ref mut stream, _, _) + | OutboundSubstreamState::PendingFlush(ref mut stream, _) + | OutboundSubstreamState::WaitingAnswer(ref mut stream, _) + | OutboundSubstreamState::Closing(ref mut stream) => { match Sink::poll_close(Pin::new(stream), cx) { Poll::Ready(_) => Poll::Ready(()), Poll::Pending => Poll::Pending, } } - SubstreamState::InWaitingMessage(_, ref mut stream) - | SubstreamState::InWaitingUser(_, ref mut stream) - | SubstreamState::InPendingSend(_, ref mut stream, _) - | SubstreamState::InPendingFlush(_, ref mut stream) - | SubstreamState::InClosing(ref mut stream) => { + } + } +} + +impl InboundSubstreamState { + /// Tries to close the substream. + /// + /// If the substream is not ready to be closed, returns it back. + fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { + match self { + InboundSubstreamState::WaitingMessage { + substream: ref mut stream, + .. + } + | InboundSubstreamState::WaitingUser(_, ref mut stream) + | InboundSubstreamState::PendingSend(_, ref mut stream, _) + | InboundSubstreamState::PendingFlush(_, ref mut stream) + | InboundSubstreamState::Closing(ref mut stream) => { match Sink::poll_close(Pin::new(stream), cx) { Poll::Ready(_) => Poll::Ready(()), Poll::Pending => Poll::Pending, @@ -452,14 +481,20 @@ struct UniqueConnecId(u64); impl KademliaHandler { /// Create a [`KademliaHandler`] using the given configuration. - pub fn new(config: KademliaHandlerConfig, endpoint: ConnectedPoint) -> Self { + pub fn new( + config: KademliaHandlerConfig, + endpoint: ConnectedPoint, + remote_peer_id: PeerId, + ) -> Self { let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout); KademliaHandler { config, endpoint, + remote_peer_id, next_connec_unique_id: UniqueConnecId(0), - substreams: Vec::new(), + inbound_substreams: Default::default(), + outbound_substreams: Default::default(), keep_alive, protocol_status: ProtocolStatus::Unconfirmed, } @@ -493,8 +528,10 @@ where protocol: >::Output, (msg, user_data): Self::OutboundOpenInfo, ) { - self.substreams - .push(SubstreamState::OutPendingSend(protocol, msg, user_data)); + self.outbound_substreams + .push(OutboundSubstreamState::PendingSend( + protocol, msg, user_data, + )); if let ProtocolStatus::Unconfirmed = self.protocol_status { // Upon the first successfully negotiated substream, we know that the // remote is configured with the same protocol name and we want @@ -515,84 +552,126 @@ where EitherOutput::Second(p) => void::unreachable(p), }; - debug_assert!(self.config.allow_listening); - let connec_unique_id = self.next_connec_unique_id; - self.next_connec_unique_id.0 += 1; - self.substreams - .push(SubstreamState::InWaitingMessage(connec_unique_id, protocol)); if let ProtocolStatus::Unconfirmed = self.protocol_status { // Upon the first successfully negotiated substream, we know that the // remote is configured with the same protocol name and we want // the behaviour to add this peer to the routing table, if possible. self.protocol_status = ProtocolStatus::Confirmed; } + + if self.inbound_substreams.len() == MAX_NUM_INBOUND_SUBSTREAMS { + if let Some(position) = self.inbound_substreams.iter().position(|s| { + matches!( + s, + // An inbound substream waiting to be reused. + InboundSubstreamState::WaitingMessage { first: false, .. } + ) + }) { + self.inbound_substreams.remove(position); + log::warn!( + "New inbound substream to {:?} exceeds inbound substream limit. \ + Removed older substream waiting to be reused.", + self.remote_peer_id, + ) + } else { + log::warn!( + "New inbound substream to {:?} exceeds inbound substream limit. \ + No older substream waiting to be reused. Dropping new substream.", + self.remote_peer_id, + ); + return; + } + } + + debug_assert!(self.config.allow_listening); + let connec_unique_id = self.next_connec_unique_id; + self.next_connec_unique_id.0 += 1; + self.inbound_substreams + .push(InboundSubstreamState::WaitingMessage { + first: true, + connection_id: connec_unique_id, + substream: protocol, + }); } fn inject_event(&mut self, message: KademliaHandlerIn) { match message { KademliaHandlerIn::Reset(request_id) => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(conn_id, _) => { - conn_id == &request_id.connec_unique_id - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::WaitingUser(conn_id, _) => { + conn_id == &request_id.connec_unique_id + } + _ => false, + }); if let Some(pos) = pos { // TODO: we don't properly close down the substream let waker = futures::task::noop_waker(); let mut cx = Context::from_waker(&waker); - let _ = self.substreams.remove(pos).try_close(&mut cx); + let _ = self.inbound_substreams.remove(pos).try_close(&mut cx); } } KademliaHandlerIn::FindNodeReq { key, user_data } => { let msg = KadRequestMsg::FindNode { key }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, Some(user_data))); + self.outbound_substreams + .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data))); } KademliaHandlerIn::FindNodeRes { closer_peers, request_id, } => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(ref conn_id, _) => { - conn_id == &request_id.connec_unique_id - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::WaitingUser(ref conn_id, _) => { + conn_id == &request_id.connec_unique_id + } + _ => false, + }); if let Some(pos) = pos { - let (conn_id, substream) = match self.substreams.remove(pos) { - SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), + let (conn_id, substream) = match self.inbound_substreams.remove(pos) { + InboundSubstreamState::WaitingUser(conn_id, substream) => { + (conn_id, substream) + } _ => unreachable!(), }; let msg = KadResponseMsg::FindNode { closer_peers }; - self.substreams - .push(SubstreamState::InPendingSend(conn_id, substream, msg)); + self.inbound_substreams + .push(InboundSubstreamState::PendingSend(conn_id, substream, msg)); } } KademliaHandlerIn::GetProvidersReq { key, user_data } => { let msg = KadRequestMsg::GetProviders { key }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, Some(user_data))); + self.outbound_substreams + .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data))); } KademliaHandlerIn::GetProvidersRes { closer_peers, provider_peers, request_id, } => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(ref conn_id, _) - if conn_id == &request_id.connec_unique_id => - { - true - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::WaitingUser(ref conn_id, _) + if conn_id == &request_id.connec_unique_id => + { + true + } + _ => false, + }); if let Some(pos) = pos { - let (conn_id, substream) = match self.substreams.remove(pos) { - SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), + let (conn_id, substream) = match self.inbound_substreams.remove(pos) { + InboundSubstreamState::WaitingUser(conn_id, substream) => { + (conn_id, substream) + } _ => unreachable!(), }; @@ -600,40 +679,45 @@ where closer_peers, provider_peers, }; - self.substreams - .push(SubstreamState::InPendingSend(conn_id, substream, msg)); + self.inbound_substreams + .push(InboundSubstreamState::PendingSend(conn_id, substream, msg)); } } KademliaHandlerIn::AddProvider { key, provider } => { let msg = KadRequestMsg::AddProvider { key, provider }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, None)); + self.outbound_substreams + .push(OutboundSubstreamState::PendingOpen(msg, None)); } KademliaHandlerIn::GetRecord { key, user_data } => { let msg = KadRequestMsg::GetValue { key }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, Some(user_data))); + self.outbound_substreams + .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data))); } KademliaHandlerIn::PutRecord { record, user_data } => { let msg = KadRequestMsg::PutValue { record }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, Some(user_data))); + self.outbound_substreams + .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data))); } KademliaHandlerIn::GetRecordRes { record, closer_peers, request_id, } => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(ref conn_id, _) => { - conn_id == &request_id.connec_unique_id - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::WaitingUser(ref conn_id, _) => { + conn_id == &request_id.connec_unique_id + } + _ => false, + }); if let Some(pos) = pos { - let (conn_id, substream) = match self.substreams.remove(pos) { - SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), + let (conn_id, substream) = match self.inbound_substreams.remove(pos) { + InboundSubstreamState::WaitingUser(conn_id, substream) => { + (conn_id, substream) + } _ => unreachable!(), }; @@ -641,8 +725,8 @@ where record, closer_peers, }; - self.substreams - .push(SubstreamState::InPendingSend(conn_id, substream, msg)); + self.inbound_substreams + .push(InboundSubstreamState::PendingSend(conn_id, substream, msg)); } } KademliaHandlerIn::PutRecordRes { @@ -650,24 +734,29 @@ where request_id, value, } => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(ref conn_id, _) - if conn_id == &request_id.connec_unique_id => - { - true - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::WaitingUser(ref conn_id, _) + if conn_id == &request_id.connec_unique_id => + { + true + } + _ => false, + }); if let Some(pos) = pos { - let (conn_id, substream) = match self.substreams.remove(pos) { - SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), + let (conn_id, substream) = match self.inbound_substreams.remove(pos) { + InboundSubstreamState::WaitingUser(conn_id, substream) => { + (conn_id, substream) + } _ => unreachable!(), }; let msg = KadResponseMsg::PutValue { key, value }; - self.substreams - .push(SubstreamState::InPendingSend(conn_id, substream, msg)); + self.inbound_substreams + .push(InboundSubstreamState::PendingSend(conn_id, substream, msg)); } } } @@ -681,8 +770,8 @@ where // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't // continue trying if let Some(user_data) = user_data { - self.substreams - .push(SubstreamState::OutReportError(error.into(), user_data)); + self.outbound_substreams + .push(OutboundSubstreamState::ReportError(error.into(), user_data)); } } @@ -701,7 +790,7 @@ where Self::Error, >, > { - if self.substreams.is_empty() { + if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() { return Poll::Pending; } @@ -714,25 +803,26 @@ where )); } - // We remove each element from `substreams` one by one and add them back. - for n in (0..self.substreams.len()).rev() { - let mut substream = self.substreams.swap_remove(n); + // We remove each element from `outbound_substreams` one by one and add them back. + for n in (0..self.outbound_substreams.len()).rev() { + let mut substream = self.outbound_substreams.swap_remove(n); loop { - match advance_substream(substream, self.config.protocol_config.clone(), cx) { + match advance_outbound_substream(substream, self.config.protocol_config.clone(), cx) + { (Some(new_state), Some(event), _) => { - self.substreams.push(new_state); + self.outbound_substreams.push(new_state); return Poll::Ready(event); } (None, Some(event), _) => { - if self.substreams.is_empty() { + if self.outbound_substreams.is_empty() { self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout); } return Poll::Ready(event); } (Some(new_state), None, false) => { - self.substreams.push(new_state); + self.outbound_substreams.push(new_state); break; } (Some(new_state), None, true) => { @@ -746,7 +836,39 @@ where } } - if self.substreams.is_empty() { + // We remove each element from `inbound_substreams` one by one and add them back. + for n in (0..self.inbound_substreams.len()).rev() { + let mut substream = self.inbound_substreams.swap_remove(n); + + loop { + match advance_inbound_substream(substream, cx) { + (Some(new_state), Some(event), _) => { + self.inbound_substreams.push(new_state); + return Poll::Ready(event); + } + (None, Some(event), _) => { + if self.inbound_substreams.is_empty() { + self.keep_alive = + KeepAlive::Until(Instant::now() + self.config.idle_timeout); + } + return Poll::Ready(event); + } + (Some(new_state), None, false) => { + self.inbound_substreams.push(new_state); + break; + } + (Some(new_state), None, true) => { + substream = new_state; + continue; + } + (None, None, _) => { + break; + } + } + } + } + + if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() { // We destroyed all substreams in this function. self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout); } else { @@ -767,16 +889,16 @@ impl Default for KademliaHandlerConfig { } } -/// Advances one substream. +/// Advances one outbound substream. /// /// Returns the new state for that substream, an event to generate, and whether the substream /// should be polled again. -fn advance_substream( - state: SubstreamState, +fn advance_outbound_substream( + state: OutboundSubstreamState, upgrade: KademliaProtocolConfig, cx: &mut Context<'_>, ) -> ( - Option>, + Option>, Option< ConnectionHandlerEvent< KademliaProtocolConfig, @@ -788,17 +910,17 @@ fn advance_substream( bool, ) { match state { - SubstreamState::OutPendingOpen(msg, user_data) => { + OutboundSubstreamState::PendingOpen(msg, user_data) => { let ev = ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(upgrade, (msg, user_data)), }; (None, Some(ev), false) } - SubstreamState::OutPendingSend(mut substream, msg, user_data) => { + OutboundSubstreamState::PendingSend(mut substream, msg, user_data) => { match Sink::poll_ready(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) { Ok(()) => ( - Some(SubstreamState::OutPendingFlush(substream, user_data)), + Some(OutboundSubstreamState::PendingFlush(substream, user_data)), None, true, ), @@ -814,7 +936,9 @@ fn advance_substream( } }, Poll::Pending => ( - Some(SubstreamState::OutPendingSend(substream, msg, user_data)), + Some(OutboundSubstreamState::PendingSend( + substream, msg, user_data, + )), None, false, ), @@ -830,21 +954,21 @@ fn advance_substream( } } } - SubstreamState::OutPendingFlush(mut substream, user_data) => { + OutboundSubstreamState::PendingFlush(mut substream, user_data) => { match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => { if let Some(user_data) = user_data { ( - Some(SubstreamState::OutWaitingAnswer(substream, user_data)), + Some(OutboundSubstreamState::WaitingAnswer(substream, user_data)), None, true, ) } else { - (Some(SubstreamState::OutClosing(substream)), None, true) + (Some(OutboundSubstreamState::Closing(substream)), None, true) } } Poll::Pending => ( - Some(SubstreamState::OutPendingFlush(substream, user_data)), + Some(OutboundSubstreamState::PendingFlush(substream, user_data)), None, false, ), @@ -860,10 +984,10 @@ fn advance_substream( } } } - SubstreamState::OutWaitingAnswer(mut substream, user_data) => { + OutboundSubstreamState::WaitingAnswer(mut substream, user_data) => { match Stream::poll_next(Pin::new(&mut substream), cx) { Poll::Ready(Some(Ok(msg))) => { - let new_state = SubstreamState::OutClosing(substream); + let new_state = OutboundSubstreamState::Closing(substream); let event = process_kad_response(msg, user_data); ( Some(new_state), @@ -872,7 +996,7 @@ fn advance_substream( ) } Poll::Pending => ( - Some(SubstreamState::OutWaitingAnswer(substream, user_data)), + Some(OutboundSubstreamState::WaitingAnswer(substream, user_data)), None, false, ), @@ -892,86 +1016,119 @@ fn advance_substream( } } } - SubstreamState::OutReportError(error, user_data) => { + OutboundSubstreamState::ReportError(error, user_data) => { let event = KademliaHandlerEvent::QueryError { error, user_data }; (None, Some(ConnectionHandlerEvent::Custom(event)), false) } - SubstreamState::OutClosing(mut stream) => match Sink::poll_close(Pin::new(&mut stream), cx) - { - Poll::Ready(Ok(())) => (None, None, false), - Poll::Pending => (Some(SubstreamState::OutClosing(stream)), None, false), - Poll::Ready(Err(_)) => (None, None, false), - }, - SubstreamState::InWaitingMessage(id, mut substream) => { - match Stream::poll_next(Pin::new(&mut substream), cx) { - Poll::Ready(Some(Ok(msg))) => { - if let Ok(ev) = process_kad_request(msg, id) { - ( - Some(SubstreamState::InWaitingUser(id, substream)), - Some(ConnectionHandlerEvent::Custom(ev)), - false, - ) - } else { - (Some(SubstreamState::InClosing(substream)), None, true) - } - } - Poll::Pending => ( - Some(SubstreamState::InWaitingMessage(id, substream)), - None, - false, - ), - Poll::Ready(None) => { - trace!("Inbound substream: EOF"); - (None, None, false) - } - Poll::Ready(Some(Err(e))) => { - trace!("Inbound substream error: {:?}", e); - (None, None, false) - } + OutboundSubstreamState::Closing(mut stream) => { + match Sink::poll_close(Pin::new(&mut stream), cx) { + Poll::Ready(Ok(())) => (None, None, false), + Poll::Pending => (Some(OutboundSubstreamState::Closing(stream)), None, false), + Poll::Ready(Err(_)) => (None, None, false), } } - SubstreamState::InWaitingUser(id, substream) => ( - Some(SubstreamState::InWaitingUser(id, substream)), + } +} +/// Advances one inbound substream. +/// +/// Returns the new state for that substream, an event to generate, and whether the substream +/// should be polled again. +fn advance_inbound_substream( + state: InboundSubstreamState, + cx: &mut Context<'_>, +) -> ( + Option, + Option< + ConnectionHandlerEvent< + KademliaProtocolConfig, + (KadRequestMsg, Option), + KademliaHandlerEvent, + io::Error, + >, + >, + bool, +) { + match state { + InboundSubstreamState::WaitingMessage { + first, + connection_id, + mut substream, + } => match Stream::poll_next(Pin::new(&mut substream), cx) { + Poll::Ready(Some(Ok(msg))) => { + if let Ok(ev) = process_kad_request(msg, connection_id) { + ( + Some(InboundSubstreamState::WaitingUser(connection_id, substream)), + Some(ConnectionHandlerEvent::Custom(ev)), + false, + ) + } else { + (Some(InboundSubstreamState::Closing(substream)), None, true) + } + } + Poll::Pending => ( + Some(InboundSubstreamState::WaitingMessage { + first, + connection_id, + substream, + }), + None, + false, + ), + Poll::Ready(None) => { + trace!("Inbound substream: EOF"); + (None, None, false) + } + Poll::Ready(Some(Err(e))) => { + trace!("Inbound substream error: {:?}", e); + (None, None, false) + } + }, + InboundSubstreamState::WaitingUser(id, substream) => ( + Some(InboundSubstreamState::WaitingUser(id, substream)), None, false, ), - SubstreamState::InPendingSend(id, mut substream, msg) => { + InboundSubstreamState::PendingSend(id, mut substream, msg) => { match Sink::poll_ready(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) { Ok(()) => ( - Some(SubstreamState::InPendingFlush(id, substream)), + Some(InboundSubstreamState::PendingFlush(id, substream)), None, true, ), Err(_) => (None, None, false), }, Poll::Pending => ( - Some(SubstreamState::InPendingSend(id, substream, msg)), + Some(InboundSubstreamState::PendingSend(id, substream, msg)), None, false, ), Poll::Ready(Err(_)) => (None, None, false), } } - SubstreamState::InPendingFlush(id, mut substream) => { + InboundSubstreamState::PendingFlush(id, mut substream) => { match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => ( - Some(SubstreamState::InWaitingMessage(id, substream)), + Some(InboundSubstreamState::WaitingMessage { + first: false, + connection_id: id, + substream, + }), None, true, ), Poll::Pending => ( - Some(SubstreamState::InPendingFlush(id, substream)), + Some(InboundSubstreamState::PendingFlush(id, substream)), None, false, ), Poll::Ready(Err(_)) => (None, None, false), } } - SubstreamState::InClosing(mut stream) => { + InboundSubstreamState::Closing(mut stream) => { match Sink::poll_close(Pin::new(&mut stream), cx) { Poll::Ready(Ok(())) => (None, None, false), - Poll::Pending => (Some(SubstreamState::InClosing(stream)), None, false), + Poll::Pending => (Some(InboundSubstreamState::Closing(stream)), None, false), Poll::Ready(Err(_)) => (None, None, false), } }