diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 7140fd1238f..77e1cd31be3 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.37.1 - unreleased + +- Limit # of inbound streams to 32. [See PR 2699]. + +[PR 2699]: https://github.com/libp2p/rust-libp2p/pull/2699 + # 0.37.0 - Update to `libp2p-core` `v0.33.0`. diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index a7410670e62..7480632b4fe 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-kad" edition = "2021" rust-version = "1.56.1" description = "Kademlia protocol for libp2p" -version = "0.37.0" +version = "0.37.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 30d22c9067d..bcadb57f44c 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -39,6 +39,8 @@ use std::{ error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration, }; +const MAX_NUM_INBOUND_SUBSTREAMS: usize = 32; + /// A prototype from which [`KademliaHandler`]s can be constructed. pub struct KademliaHandlerProto { config: KademliaHandlerConfig, @@ -57,8 +59,8 @@ impl KademliaHandlerProto { impl IntoConnectionHandler for KademliaHandlerProto { type Handler = KademliaHandler; - fn into_handler(self, _: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { - KademliaHandler::new(self.config, endpoint.clone()) + fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { + KademliaHandler::new(self.config, endpoint.clone(), *remote_peer_id) } fn inbound_protocol(&self) -> ::InboundProtocol { @@ -84,8 +86,11 @@ pub struct KademliaHandler { /// Next unique ID of a connection. next_connec_unique_id: UniqueConnecId, - /// List of active substreams with the state they are in. - substreams: Vec>, + /// List of active outbound substreams with the state they are in. + outbound_substreams: Vec>, + + /// List of active inbound substreams with the state they are in. + inbound_substreams: Vec, /// Until when to keep the connection alive. keep_alive: KeepAlive, @@ -94,6 +99,9 @@ pub struct KademliaHandler { /// is associated with. endpoint: ConnectedPoint, + /// The [`PeerId`] of the remote. + remote_peer_id: PeerId, + /// The current state of protocol confirmation. protocol_status: ProtocolStatus, } @@ -125,65 +133,86 @@ pub struct KademliaHandlerConfig { pub idle_timeout: Duration, } -/// State of an active substream, opened either by us or by the remote. -enum SubstreamState { +/// State of an active outbound substream. +enum OutboundSubstreamState { /// We haven't started opening the outgoing substream yet. /// Contains the request we want to send, and the user data if we expect an answer. - OutPendingOpen(KadRequestMsg, Option), + PendingOpen(KadRequestMsg, Option), /// Waiting to send a message to the remote. - OutPendingSend( + PendingSend( KadOutStreamSink, KadRequestMsg, Option, ), /// Waiting to flush the substream so that the data arrives to the remote. - OutPendingFlush(KadOutStreamSink, Option), + PendingFlush(KadOutStreamSink, Option), /// Waiting for an answer back from the remote. // TODO: add timeout - OutWaitingAnswer(KadOutStreamSink, TUserData), + WaitingAnswer(KadOutStreamSink, TUserData), /// An error happened on the substream and we should report the error to the user. - OutReportError(KademliaHandlerQueryErr, TUserData), + ReportError(KademliaHandlerQueryErr, TUserData), /// The substream is being closed. - OutClosing(KadOutStreamSink), + Closing(KadOutStreamSink), +} + +/// State of an active inbound substream. +enum InboundSubstreamState { /// Waiting for a request from the remote. - InWaitingMessage(UniqueConnecId, KadInStreamSink), - /// Waiting for the user to send a `KademliaHandlerIn` event containing the response. - InWaitingUser(UniqueConnecId, KadInStreamSink), + WaitingMessage { + /// Whether it is the first message to be awaited on this stream. + first: bool, + connection_id: UniqueConnecId, + substream: KadInStreamSink, + }, + /// Waiting for the user to send a [`KademliaHandlerIn`] event containing the response. + WaitingUser(UniqueConnecId, KadInStreamSink), /// Waiting to send an answer back to the remote. - InPendingSend( + PendingSend( UniqueConnecId, KadInStreamSink, KadResponseMsg, ), /// Waiting to flush an answer back to the remote. - InPendingFlush(UniqueConnecId, KadInStreamSink), + PendingFlush(UniqueConnecId, KadInStreamSink), /// The substream is being closed. - InClosing(KadInStreamSink), + Closing(KadInStreamSink), } -impl SubstreamState { +impl OutboundSubstreamState { /// Tries to close the substream. /// /// If the substream is not ready to be closed, returns it back. fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { match self { - SubstreamState::OutPendingOpen(_, _) | SubstreamState::OutReportError(_, _) => { - Poll::Ready(()) - } - SubstreamState::OutPendingSend(ref mut stream, _, _) - | SubstreamState::OutPendingFlush(ref mut stream, _) - | SubstreamState::OutWaitingAnswer(ref mut stream, _) - | SubstreamState::OutClosing(ref mut stream) => { + OutboundSubstreamState::PendingOpen(_, _) + | OutboundSubstreamState::ReportError(_, _) => Poll::Ready(()), + OutboundSubstreamState::PendingSend(ref mut stream, _, _) + | OutboundSubstreamState::PendingFlush(ref mut stream, _) + | OutboundSubstreamState::WaitingAnswer(ref mut stream, _) + | OutboundSubstreamState::Closing(ref mut stream) => { match Sink::poll_close(Pin::new(stream), cx) { Poll::Ready(_) => Poll::Ready(()), Poll::Pending => Poll::Pending, } } - SubstreamState::InWaitingMessage(_, ref mut stream) - | SubstreamState::InWaitingUser(_, ref mut stream) - | SubstreamState::InPendingSend(_, ref mut stream, _) - | SubstreamState::InPendingFlush(_, ref mut stream) - | SubstreamState::InClosing(ref mut stream) => { + } + } +} + +impl InboundSubstreamState { + /// Tries to close the substream. + /// + /// If the substream is not ready to be closed, returns it back. + fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { + match self { + InboundSubstreamState::WaitingMessage { + substream: ref mut stream, + .. + } + | InboundSubstreamState::WaitingUser(_, ref mut stream) + | InboundSubstreamState::PendingSend(_, ref mut stream, _) + | InboundSubstreamState::PendingFlush(_, ref mut stream) + | InboundSubstreamState::Closing(ref mut stream) => { match Sink::poll_close(Pin::new(stream), cx) { Poll::Ready(_) => Poll::Ready(()), Poll::Pending => Poll::Pending, @@ -452,14 +481,20 @@ struct UniqueConnecId(u64); impl KademliaHandler { /// Create a [`KademliaHandler`] using the given configuration. - pub fn new(config: KademliaHandlerConfig, endpoint: ConnectedPoint) -> Self { + pub fn new( + config: KademliaHandlerConfig, + endpoint: ConnectedPoint, + remote_peer_id: PeerId, + ) -> Self { let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout); KademliaHandler { config, endpoint, + remote_peer_id, next_connec_unique_id: UniqueConnecId(0), - substreams: Vec::new(), + inbound_substreams: Default::default(), + outbound_substreams: Default::default(), keep_alive, protocol_status: ProtocolStatus::Unconfirmed, } @@ -493,8 +528,10 @@ where protocol: >::Output, (msg, user_data): Self::OutboundOpenInfo, ) { - self.substreams - .push(SubstreamState::OutPendingSend(protocol, msg, user_data)); + self.outbound_substreams + .push(OutboundSubstreamState::PendingSend( + protocol, msg, user_data, + )); if let ProtocolStatus::Unconfirmed = self.protocol_status { // Upon the first successfully negotiated substream, we know that the // remote is configured with the same protocol name and we want @@ -515,84 +552,126 @@ where EitherOutput::Second(p) => void::unreachable(p), }; - debug_assert!(self.config.allow_listening); - let connec_unique_id = self.next_connec_unique_id; - self.next_connec_unique_id.0 += 1; - self.substreams - .push(SubstreamState::InWaitingMessage(connec_unique_id, protocol)); if let ProtocolStatus::Unconfirmed = self.protocol_status { // Upon the first successfully negotiated substream, we know that the // remote is configured with the same protocol name and we want // the behaviour to add this peer to the routing table, if possible. self.protocol_status = ProtocolStatus::Confirmed; } + + if self.inbound_substreams.len() == MAX_NUM_INBOUND_SUBSTREAMS { + if let Some(position) = self.inbound_substreams.iter().position(|s| { + matches!( + s, + // An inbound substream waiting to be reused. + InboundSubstreamState::WaitingMessage { first: false, .. } + ) + }) { + self.inbound_substreams.remove(position); + log::warn!( + "New inbound substream to {:?} exceeds inbound substream limit. \ + Removed older substream waiting to be reused.", + self.remote_peer_id, + ) + } else { + log::warn!( + "New inbound substream to {:?} exceeds inbound substream limit. \ + No older substream waiting to be reused. Dropping new substream.", + self.remote_peer_id, + ); + return; + } + } + + debug_assert!(self.config.allow_listening); + let connec_unique_id = self.next_connec_unique_id; + self.next_connec_unique_id.0 += 1; + self.inbound_substreams + .push(InboundSubstreamState::WaitingMessage { + first: true, + connection_id: connec_unique_id, + substream: protocol, + }); } fn inject_event(&mut self, message: KademliaHandlerIn) { match message { KademliaHandlerIn::Reset(request_id) => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(conn_id, _) => { - conn_id == &request_id.connec_unique_id - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::WaitingUser(conn_id, _) => { + conn_id == &request_id.connec_unique_id + } + _ => false, + }); if let Some(pos) = pos { // TODO: we don't properly close down the substream let waker = futures::task::noop_waker(); let mut cx = Context::from_waker(&waker); - let _ = self.substreams.remove(pos).try_close(&mut cx); + let _ = self.inbound_substreams.remove(pos).try_close(&mut cx); } } KademliaHandlerIn::FindNodeReq { key, user_data } => { let msg = KadRequestMsg::FindNode { key }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, Some(user_data))); + self.outbound_substreams + .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data))); } KademliaHandlerIn::FindNodeRes { closer_peers, request_id, } => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(ref conn_id, _) => { - conn_id == &request_id.connec_unique_id - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::WaitingUser(ref conn_id, _) => { + conn_id == &request_id.connec_unique_id + } + _ => false, + }); if let Some(pos) = pos { - let (conn_id, substream) = match self.substreams.remove(pos) { - SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), + let (conn_id, substream) = match self.inbound_substreams.remove(pos) { + InboundSubstreamState::WaitingUser(conn_id, substream) => { + (conn_id, substream) + } _ => unreachable!(), }; let msg = KadResponseMsg::FindNode { closer_peers }; - self.substreams - .push(SubstreamState::InPendingSend(conn_id, substream, msg)); + self.inbound_substreams + .push(InboundSubstreamState::PendingSend(conn_id, substream, msg)); } } KademliaHandlerIn::GetProvidersReq { key, user_data } => { let msg = KadRequestMsg::GetProviders { key }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, Some(user_data))); + self.outbound_substreams + .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data))); } KademliaHandlerIn::GetProvidersRes { closer_peers, provider_peers, request_id, } => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(ref conn_id, _) - if conn_id == &request_id.connec_unique_id => - { - true - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::WaitingUser(ref conn_id, _) + if conn_id == &request_id.connec_unique_id => + { + true + } + _ => false, + }); if let Some(pos) = pos { - let (conn_id, substream) = match self.substreams.remove(pos) { - SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), + let (conn_id, substream) = match self.inbound_substreams.remove(pos) { + InboundSubstreamState::WaitingUser(conn_id, substream) => { + (conn_id, substream) + } _ => unreachable!(), }; @@ -600,40 +679,45 @@ where closer_peers, provider_peers, }; - self.substreams - .push(SubstreamState::InPendingSend(conn_id, substream, msg)); + self.inbound_substreams + .push(InboundSubstreamState::PendingSend(conn_id, substream, msg)); } } KademliaHandlerIn::AddProvider { key, provider } => { let msg = KadRequestMsg::AddProvider { key, provider }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, None)); + self.outbound_substreams + .push(OutboundSubstreamState::PendingOpen(msg, None)); } KademliaHandlerIn::GetRecord { key, user_data } => { let msg = KadRequestMsg::GetValue { key }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, Some(user_data))); + self.outbound_substreams + .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data))); } KademliaHandlerIn::PutRecord { record, user_data } => { let msg = KadRequestMsg::PutValue { record }; - self.substreams - .push(SubstreamState::OutPendingOpen(msg, Some(user_data))); + self.outbound_substreams + .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data))); } KademliaHandlerIn::GetRecordRes { record, closer_peers, request_id, } => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(ref conn_id, _) => { - conn_id == &request_id.connec_unique_id - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::WaitingUser(ref conn_id, _) => { + conn_id == &request_id.connec_unique_id + } + _ => false, + }); if let Some(pos) = pos { - let (conn_id, substream) = match self.substreams.remove(pos) { - SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), + let (conn_id, substream) = match self.inbound_substreams.remove(pos) { + InboundSubstreamState::WaitingUser(conn_id, substream) => { + (conn_id, substream) + } _ => unreachable!(), }; @@ -641,8 +725,8 @@ where record, closer_peers, }; - self.substreams - .push(SubstreamState::InPendingSend(conn_id, substream, msg)); + self.inbound_substreams + .push(InboundSubstreamState::PendingSend(conn_id, substream, msg)); } } KademliaHandlerIn::PutRecordRes { @@ -650,24 +734,29 @@ where request_id, value, } => { - let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(ref conn_id, _) - if conn_id == &request_id.connec_unique_id => - { - true - } - _ => false, - }); + let pos = self + .inbound_substreams + .iter() + .position(|state| match state { + InboundSubstreamState::WaitingUser(ref conn_id, _) + if conn_id == &request_id.connec_unique_id => + { + true + } + _ => false, + }); if let Some(pos) = pos { - let (conn_id, substream) = match self.substreams.remove(pos) { - SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), + let (conn_id, substream) = match self.inbound_substreams.remove(pos) { + InboundSubstreamState::WaitingUser(conn_id, substream) => { + (conn_id, substream) + } _ => unreachable!(), }; let msg = KadResponseMsg::PutValue { key, value }; - self.substreams - .push(SubstreamState::InPendingSend(conn_id, substream, msg)); + self.inbound_substreams + .push(InboundSubstreamState::PendingSend(conn_id, substream, msg)); } } } @@ -681,8 +770,8 @@ where // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't // continue trying if let Some(user_data) = user_data { - self.substreams - .push(SubstreamState::OutReportError(error.into(), user_data)); + self.outbound_substreams + .push(OutboundSubstreamState::ReportError(error.into(), user_data)); } } @@ -701,7 +790,7 @@ where Self::Error, >, > { - if self.substreams.is_empty() { + if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() { return Poll::Pending; } @@ -714,25 +803,26 @@ where )); } - // We remove each element from `substreams` one by one and add them back. - for n in (0..self.substreams.len()).rev() { - let mut substream = self.substreams.swap_remove(n); + // We remove each element from `outbound_substreams` one by one and add them back. + for n in (0..self.outbound_substreams.len()).rev() { + let mut substream = self.outbound_substreams.swap_remove(n); loop { - match advance_substream(substream, self.config.protocol_config.clone(), cx) { + match advance_outbound_substream(substream, self.config.protocol_config.clone(), cx) + { (Some(new_state), Some(event), _) => { - self.substreams.push(new_state); + self.outbound_substreams.push(new_state); return Poll::Ready(event); } (None, Some(event), _) => { - if self.substreams.is_empty() { + if self.outbound_substreams.is_empty() { self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout); } return Poll::Ready(event); } (Some(new_state), None, false) => { - self.substreams.push(new_state); + self.outbound_substreams.push(new_state); break; } (Some(new_state), None, true) => { @@ -746,7 +836,39 @@ where } } - if self.substreams.is_empty() { + // We remove each element from `inbound_substreams` one by one and add them back. + for n in (0..self.inbound_substreams.len()).rev() { + let mut substream = self.inbound_substreams.swap_remove(n); + + loop { + match advance_inbound_substream(substream, cx) { + (Some(new_state), Some(event), _) => { + self.inbound_substreams.push(new_state); + return Poll::Ready(event); + } + (None, Some(event), _) => { + if self.inbound_substreams.is_empty() { + self.keep_alive = + KeepAlive::Until(Instant::now() + self.config.idle_timeout); + } + return Poll::Ready(event); + } + (Some(new_state), None, false) => { + self.inbound_substreams.push(new_state); + break; + } + (Some(new_state), None, true) => { + substream = new_state; + continue; + } + (None, None, _) => { + break; + } + } + } + } + + if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() { // We destroyed all substreams in this function. self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout); } else { @@ -767,16 +889,16 @@ impl Default for KademliaHandlerConfig { } } -/// Advances one substream. +/// Advances one outbound substream. /// /// Returns the new state for that substream, an event to generate, and whether the substream /// should be polled again. -fn advance_substream( - state: SubstreamState, +fn advance_outbound_substream( + state: OutboundSubstreamState, upgrade: KademliaProtocolConfig, cx: &mut Context<'_>, ) -> ( - Option>, + Option>, Option< ConnectionHandlerEvent< KademliaProtocolConfig, @@ -788,17 +910,17 @@ fn advance_substream( bool, ) { match state { - SubstreamState::OutPendingOpen(msg, user_data) => { + OutboundSubstreamState::PendingOpen(msg, user_data) => { let ev = ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(upgrade, (msg, user_data)), }; (None, Some(ev), false) } - SubstreamState::OutPendingSend(mut substream, msg, user_data) => { + OutboundSubstreamState::PendingSend(mut substream, msg, user_data) => { match Sink::poll_ready(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) { Ok(()) => ( - Some(SubstreamState::OutPendingFlush(substream, user_data)), + Some(OutboundSubstreamState::PendingFlush(substream, user_data)), None, true, ), @@ -814,7 +936,9 @@ fn advance_substream( } }, Poll::Pending => ( - Some(SubstreamState::OutPendingSend(substream, msg, user_data)), + Some(OutboundSubstreamState::PendingSend( + substream, msg, user_data, + )), None, false, ), @@ -830,21 +954,21 @@ fn advance_substream( } } } - SubstreamState::OutPendingFlush(mut substream, user_data) => { + OutboundSubstreamState::PendingFlush(mut substream, user_data) => { match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => { if let Some(user_data) = user_data { ( - Some(SubstreamState::OutWaitingAnswer(substream, user_data)), + Some(OutboundSubstreamState::WaitingAnswer(substream, user_data)), None, true, ) } else { - (Some(SubstreamState::OutClosing(substream)), None, true) + (Some(OutboundSubstreamState::Closing(substream)), None, true) } } Poll::Pending => ( - Some(SubstreamState::OutPendingFlush(substream, user_data)), + Some(OutboundSubstreamState::PendingFlush(substream, user_data)), None, false, ), @@ -860,10 +984,10 @@ fn advance_substream( } } } - SubstreamState::OutWaitingAnswer(mut substream, user_data) => { + OutboundSubstreamState::WaitingAnswer(mut substream, user_data) => { match Stream::poll_next(Pin::new(&mut substream), cx) { Poll::Ready(Some(Ok(msg))) => { - let new_state = SubstreamState::OutClosing(substream); + let new_state = OutboundSubstreamState::Closing(substream); let event = process_kad_response(msg, user_data); ( Some(new_state), @@ -872,7 +996,7 @@ fn advance_substream( ) } Poll::Pending => ( - Some(SubstreamState::OutWaitingAnswer(substream, user_data)), + Some(OutboundSubstreamState::WaitingAnswer(substream, user_data)), None, false, ), @@ -892,86 +1016,119 @@ fn advance_substream( } } } - SubstreamState::OutReportError(error, user_data) => { + OutboundSubstreamState::ReportError(error, user_data) => { let event = KademliaHandlerEvent::QueryError { error, user_data }; (None, Some(ConnectionHandlerEvent::Custom(event)), false) } - SubstreamState::OutClosing(mut stream) => match Sink::poll_close(Pin::new(&mut stream), cx) - { - Poll::Ready(Ok(())) => (None, None, false), - Poll::Pending => (Some(SubstreamState::OutClosing(stream)), None, false), - Poll::Ready(Err(_)) => (None, None, false), - }, - SubstreamState::InWaitingMessage(id, mut substream) => { - match Stream::poll_next(Pin::new(&mut substream), cx) { - Poll::Ready(Some(Ok(msg))) => { - if let Ok(ev) = process_kad_request(msg, id) { - ( - Some(SubstreamState::InWaitingUser(id, substream)), - Some(ConnectionHandlerEvent::Custom(ev)), - false, - ) - } else { - (Some(SubstreamState::InClosing(substream)), None, true) - } - } - Poll::Pending => ( - Some(SubstreamState::InWaitingMessage(id, substream)), - None, - false, - ), - Poll::Ready(None) => { - trace!("Inbound substream: EOF"); - (None, None, false) - } - Poll::Ready(Some(Err(e))) => { - trace!("Inbound substream error: {:?}", e); - (None, None, false) - } + OutboundSubstreamState::Closing(mut stream) => { + match Sink::poll_close(Pin::new(&mut stream), cx) { + Poll::Ready(Ok(())) => (None, None, false), + Poll::Pending => (Some(OutboundSubstreamState::Closing(stream)), None, false), + Poll::Ready(Err(_)) => (None, None, false), } } - SubstreamState::InWaitingUser(id, substream) => ( - Some(SubstreamState::InWaitingUser(id, substream)), + } +} +/// Advances one inbound substream. +/// +/// Returns the new state for that substream, an event to generate, and whether the substream +/// should be polled again. +fn advance_inbound_substream( + state: InboundSubstreamState, + cx: &mut Context<'_>, +) -> ( + Option, + Option< + ConnectionHandlerEvent< + KademliaProtocolConfig, + (KadRequestMsg, Option), + KademliaHandlerEvent, + io::Error, + >, + >, + bool, +) { + match state { + InboundSubstreamState::WaitingMessage { + first, + connection_id, + mut substream, + } => match Stream::poll_next(Pin::new(&mut substream), cx) { + Poll::Ready(Some(Ok(msg))) => { + if let Ok(ev) = process_kad_request(msg, connection_id) { + ( + Some(InboundSubstreamState::WaitingUser(connection_id, substream)), + Some(ConnectionHandlerEvent::Custom(ev)), + false, + ) + } else { + (Some(InboundSubstreamState::Closing(substream)), None, true) + } + } + Poll::Pending => ( + Some(InboundSubstreamState::WaitingMessage { + first, + connection_id, + substream, + }), + None, + false, + ), + Poll::Ready(None) => { + trace!("Inbound substream: EOF"); + (None, None, false) + } + Poll::Ready(Some(Err(e))) => { + trace!("Inbound substream error: {:?}", e); + (None, None, false) + } + }, + InboundSubstreamState::WaitingUser(id, substream) => ( + Some(InboundSubstreamState::WaitingUser(id, substream)), None, false, ), - SubstreamState::InPendingSend(id, mut substream, msg) => { + InboundSubstreamState::PendingSend(id, mut substream, msg) => { match Sink::poll_ready(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) { Ok(()) => ( - Some(SubstreamState::InPendingFlush(id, substream)), + Some(InboundSubstreamState::PendingFlush(id, substream)), None, true, ), Err(_) => (None, None, false), }, Poll::Pending => ( - Some(SubstreamState::InPendingSend(id, substream, msg)), + Some(InboundSubstreamState::PendingSend(id, substream, msg)), None, false, ), Poll::Ready(Err(_)) => (None, None, false), } } - SubstreamState::InPendingFlush(id, mut substream) => { + InboundSubstreamState::PendingFlush(id, mut substream) => { match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => ( - Some(SubstreamState::InWaitingMessage(id, substream)), + Some(InboundSubstreamState::WaitingMessage { + first: false, + connection_id: id, + substream, + }), None, true, ), Poll::Pending => ( - Some(SubstreamState::InPendingFlush(id, substream)), + Some(InboundSubstreamState::PendingFlush(id, substream)), None, false, ), Poll::Ready(Err(_)) => (None, None, false), } } - SubstreamState::InClosing(mut stream) => { + InboundSubstreamState::Closing(mut stream) => { match Sink::poll_close(Pin::new(&mut stream), cx) { Poll::Ready(Ok(())) => (None, None, false), - Poll::Pending => (Some(SubstreamState::InClosing(stream)), None, false), + Poll::Pending => (Some(InboundSubstreamState::Closing(stream)), None, false), Poll::Ready(Err(_)) => (None, None, false), } }