From aa946c2c180ffa05ca925808a8154bd9377848dd Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sun, 15 May 2022 21:43:13 +1000 Subject: [PATCH 1/7] Fix `StreamMuxer::Error` to io::Error We are already enforcing that the associated type must convert to `io::Error`. We might as well just make all functions return an `io::Error` directly. --- core/src/either.rs | 45 ++++++++++----------- core/src/muxing.rs | 33 +++++++-------- core/src/muxing/boxed.rs | 55 ++++++++++--------------- core/src/muxing/singleton.rs | 5 +-- core/src/transport/upgrade.rs | 3 +- core/tests/util.rs | 5 +-- muxers/mplex/src/lib.rs | 2 - muxers/yamux/src/lib.rs | 67 +++++++++++++------------------ swarm/src/connection/substream.rs | 6 +-- 9 files changed, 93 insertions(+), 128 deletions(-) diff --git a/core/src/either.rs b/core/src/either.rs index e1c5316fd6a..2dd01813f7e 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -203,15 +203,14 @@ where { type Substream = EitherOutput; type OutboundSubstream = EitherOutbound; - type Error = io::Error; fn poll_event( &self, cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + ) -> Poll>> { match self { EitherOutput::First(inner) => inner.poll_event(cx).map(|result| { - result.map_err(|e| e.into()).map(|event| match event { + result.map(|event| match event { StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), StreamMuxerEvent::InboundSubstream(substream) => { StreamMuxerEvent::InboundSubstream(EitherOutput::First(substream)) @@ -219,7 +218,7 @@ where }) }), EitherOutput::Second(inner) => inner.poll_event(cx).map(|result| { - result.map_err(|e| e.into()).map(|event| match event { + result.map(|event| match event { StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), StreamMuxerEvent::InboundSubstream(substream) => { StreamMuxerEvent::InboundSubstream(EitherOutput::Second(substream)) @@ -240,16 +239,14 @@ where &self, cx: &mut Context<'_>, substream: &mut Self::OutboundSubstream, - ) -> Poll> { + ) -> Poll> { match (self, substream) { (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner .poll_outbound(cx, substream) - .map(|p| p.map(EitherOutput::First)) - .map_err(|e| e.into()), + .map(|p| p.map(EitherOutput::First)), (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner .poll_outbound(cx, substream) - .map(|p| p.map(EitherOutput::Second)) - .map_err(|e| e.into()), + .map(|p| p.map(EitherOutput::Second)), _ => panic!("Wrong API usage"), } } @@ -272,13 +269,13 @@ where cx: &mut Context<'_>, sub: &mut Self::Substream, buf: &mut [u8], - ) -> Poll> { + ) -> Poll> { match (self, sub) { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.read_substream(cx, sub, buf).map_err(|e| e.into()) + inner.read_substream(cx, sub, buf) } (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.read_substream(cx, sub, buf).map_err(|e| e.into()) + inner.read_substream(cx, sub, buf) } _ => panic!("Wrong API usage"), } @@ -289,13 +286,13 @@ where cx: &mut Context<'_>, sub: &mut Self::Substream, buf: &[u8], - ) -> Poll> { + ) -> Poll> { match (self, sub) { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.write_substream(cx, sub, buf).map_err(|e| e.into()) + inner.write_substream(cx, sub, buf) } (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.write_substream(cx, sub, buf).map_err(|e| e.into()) + inner.write_substream(cx, sub, buf) } _ => panic!("Wrong API usage"), } @@ -305,13 +302,13 @@ where &self, cx: &mut Context<'_>, sub: &mut Self::Substream, - ) -> Poll> { + ) -> Poll> { match (self, sub) { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.flush_substream(cx, sub).map_err(|e| e.into()) + inner.flush_substream(cx, sub) } (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.flush_substream(cx, sub).map_err(|e| e.into()) + inner.flush_substream(cx, sub) } _ => panic!("Wrong API usage"), } @@ -321,13 +318,13 @@ where &self, cx: &mut Context<'_>, sub: &mut Self::Substream, - ) -> Poll> { + ) -> Poll> { match (self, sub) { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.shutdown_substream(cx, sub).map_err(|e| e.into()) + inner.shutdown_substream(cx, sub) } (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.shutdown_substream(cx, sub).map_err(|e| e.into()) + inner.shutdown_substream(cx, sub) } _ => panic!("Wrong API usage"), } @@ -346,10 +343,10 @@ where } } - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { match self { - EitherOutput::First(inner) => inner.poll_close(cx).map_err(|e| e.into()), - EitherOutput::Second(inner) => inner.poll_close(cx).map_err(|e| e.into()), + EitherOutput::First(inner) => inner.poll_close(cx), + EitherOutput::Second(inner) => inner.poll_close(cx), } } } diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 969c56c782e..1ae353c5e5a 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -80,9 +80,6 @@ pub trait StreamMuxer { /// Future that will be resolved when the outgoing substream is open. type OutboundSubstream; - /// Error type of the muxer - type Error: Into; - /// Polls for a connection-wide event. /// /// This function behaves the same as a `Stream`. @@ -98,7 +95,7 @@ pub trait StreamMuxer { fn poll_event( &self, cx: &mut Context<'_>, - ) -> Poll, Self::Error>>; + ) -> Poll>>; /// Opens a new outgoing substream, and produces the equivalent to a future that will be /// resolved when it becomes available. @@ -120,7 +117,7 @@ pub trait StreamMuxer { &self, cx: &mut Context<'_>, s: &mut Self::OutboundSubstream, - ) -> Poll>; + ) -> Poll>; /// Destroys an outbound substream future. Use this after the outbound substream has finished, /// or if you want to interrupt it. @@ -142,7 +139,7 @@ pub trait StreamMuxer { cx: &mut Context<'_>, s: &mut Self::Substream, buf: &mut [u8], - ) -> Poll>; + ) -> Poll>; /// Write data to a substream. The behaviour is the same as `futures::AsyncWrite::poll_write`. /// @@ -160,7 +157,7 @@ pub trait StreamMuxer { cx: &mut Context<'_>, s: &mut Self::Substream, buf: &[u8], - ) -> Poll>; + ) -> Poll>; /// Flushes a substream. The behaviour is the same as `futures::AsyncWrite::poll_flush`. /// @@ -176,7 +173,7 @@ pub trait StreamMuxer { &self, cx: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll>; + ) -> Poll>; /// Attempts to shut down the writing side of a substream. The behaviour is similar to /// `AsyncWrite::poll_close`. @@ -193,7 +190,7 @@ pub trait StreamMuxer { &self, cx: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll>; + ) -> Poll>; /// Destroys a substream. fn destroy_substream(&self, s: Self::Substream); @@ -210,7 +207,7 @@ pub trait StreamMuxer { /// > that the remote is properly informed of the shutdown. However, apart from /// > properly informing the remote, there is no difference between this and /// > immediately dropping the muxer. - fn poll_close(&self, cx: &mut Context<'_>) -> Poll>; + fn poll_close(&self, cx: &mut Context<'_>) -> Poll>; } /// Event about a connection, reported by an implementation of [`StreamMuxer`]. @@ -242,7 +239,7 @@ impl StreamMuxerEvent { /// object that implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`. pub fn event_from_ref_and_wrap

( muxer: P, -) -> impl Future>, ::Error>> +) -> impl Future>>> where P: Deref + Clone, P::Target: StreamMuxer, @@ -281,7 +278,7 @@ where P: Deref + Clone, P::Target: StreamMuxer, { - type Output = Result, ::Error>; + type Output = io::Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match Future::poll(Pin::new(&mut self.inner), cx) { @@ -330,7 +327,7 @@ where P: Deref, P::Target: StreamMuxer, { - type Output = Result<::Substream, ::Error>; + type Output = io::Result<::Substream>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // We use a `this` because the compiler isn't smart enough to allow mutably borrowing @@ -419,7 +416,7 @@ where let this = &mut *self; let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.read_substream(cx, s, buf).map_err(|e| e.into()) + this.muxer.read_substream(cx, s, buf) } } @@ -438,7 +435,7 @@ where let this = &mut *self; let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.write_substream(cx, s, buf).map_err(|e| e.into()) + this.muxer.write_substream(cx, s, buf) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -451,12 +448,12 @@ where match this.shutdown_state { ShutdownState::Shutdown => match this.muxer.shutdown_substream(cx, s) { Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Flush, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Pending => return Poll::Pending, }, ShutdownState::Flush => match this.muxer.flush_substream(cx, s) { Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Done, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Pending => return Poll::Pending, }, ShutdownState::Done => { @@ -472,7 +469,7 @@ where let this = &mut *self; let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.flush_substream(cx, s).map_err(|e| e.into()) + this.muxer.flush_substream(cx, s) } } diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index 1284febe5dd..dee654aa540 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -8,11 +8,7 @@ use std::task::{Context, Poll}; /// Abstract `StreamMuxer`. pub struct StreamMuxerBox { - inner: Box< - dyn StreamMuxer - + Send - + Sync, - >, + inner: Box + Send + Sync>, } struct Wrap @@ -32,20 +28,19 @@ where { type Substream = usize; // TODO: use a newtype type OutboundSubstream = usize; // TODO: use a newtype - type Error = io::Error; #[inline] fn poll_event( &self, cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + ) -> Poll>> { let substream = match self.inner.poll_event(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) => { return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) } Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), }; let id = self.next_substream.fetch_add(1, Ordering::Relaxed); @@ -66,7 +61,7 @@ where &self, cx: &mut Context<'_>, substream: &mut Self::OutboundSubstream, - ) -> Poll> { + ) -> Poll> { let mut list = self.outbound.lock(); let substream = match self .inner @@ -74,7 +69,7 @@ where { Poll::Pending => return Poll::Pending, Poll::Ready(Ok(s)) => s, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), }; let id = self.next_substream.fetch_add(1, Ordering::Relaxed); self.substreams.lock().insert(id, substream); @@ -94,11 +89,9 @@ where cx: &mut Context<'_>, s: &mut Self::Substream, buf: &mut [u8], - ) -> Poll> { + ) -> Poll> { let mut list = self.substreams.lock(); - self.inner - .read_substream(cx, list.get_mut(s).unwrap(), buf) - .map_err(|e| e.into()) + self.inner.read_substream(cx, list.get_mut(s).unwrap(), buf) } #[inline] @@ -107,11 +100,10 @@ where cx: &mut Context<'_>, s: &mut Self::Substream, buf: &[u8], - ) -> Poll> { + ) -> Poll> { let mut list = self.substreams.lock(); self.inner .write_substream(cx, list.get_mut(s).unwrap(), buf) - .map_err(|e| e.into()) } #[inline] @@ -119,11 +111,9 @@ where &self, cx: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll> { + ) -> Poll> { let mut list = self.substreams.lock(); - self.inner - .flush_substream(cx, list.get_mut(s).unwrap()) - .map_err(|e| e.into()) + self.inner.flush_substream(cx, list.get_mut(s).unwrap()) } #[inline] @@ -131,11 +121,9 @@ where &self, cx: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll> { + ) -> Poll> { let mut list = self.substreams.lock(); - self.inner - .shutdown_substream(cx, list.get_mut(s).unwrap()) - .map_err(|e| e.into()) + self.inner.shutdown_substream(cx, list.get_mut(s).unwrap()) } #[inline] @@ -146,8 +134,8 @@ where } #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx).map_err(|e| e.into()) + fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx) } } @@ -176,13 +164,12 @@ impl StreamMuxerBox { impl StreamMuxer for StreamMuxerBox { type Substream = usize; // TODO: use a newtype type OutboundSubstream = usize; // TODO: use a newtype - type Error = io::Error; #[inline] fn poll_event( &self, cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + ) -> Poll>> { self.inner.poll_event(cx) } @@ -196,7 +183,7 @@ impl StreamMuxer for StreamMuxerBox { &self, cx: &mut Context<'_>, s: &mut Self::OutboundSubstream, - ) -> Poll> { + ) -> Poll> { self.inner.poll_outbound(cx, s) } @@ -211,7 +198,7 @@ impl StreamMuxer for StreamMuxerBox { cx: &mut Context<'_>, s: &mut Self::Substream, buf: &mut [u8], - ) -> Poll> { + ) -> Poll> { self.inner.read_substream(cx, s, buf) } @@ -221,7 +208,7 @@ impl StreamMuxer for StreamMuxerBox { cx: &mut Context<'_>, s: &mut Self::Substream, buf: &[u8], - ) -> Poll> { + ) -> Poll> { self.inner.write_substream(cx, s, buf) } @@ -230,7 +217,7 @@ impl StreamMuxer for StreamMuxerBox { &self, cx: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll> { + ) -> Poll> { self.inner.flush_substream(cx, s) } @@ -239,7 +226,7 @@ impl StreamMuxer for StreamMuxerBox { &self, cx: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll> { + ) -> Poll> { self.inner.shutdown_substream(cx, s) } @@ -249,7 +236,7 @@ impl StreamMuxer for StreamMuxerBox { } #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_close(cx) } } diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index 529fa147f9c..d52915cf7c6 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -72,12 +72,11 @@ where { type Substream = Substream; type OutboundSubstream = OutboundSubstream; - type Error = io::Error; fn poll_event( &self, _: &mut Context<'_>, - ) -> Poll, io::Error>> { + ) -> Poll>> { match self.endpoint { Endpoint::Dialer => return Poll::Pending, Endpoint::Listener => {} @@ -149,7 +148,7 @@ where fn destroy_substream(&self, _: Self::Substream) {} - fn poll_close(&self, _cx: &mut Context<'_>) -> Poll> { + fn poll_close(&self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 5508429754f..3930adbfd17 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -22,9 +22,10 @@ pub use crate::upgrade::Version; +use crate::muxing::StreamMuxerBox; use crate::{ connection::ConnectedPoint, - muxing::{StreamMuxer, StreamMuxerBox}, + muxing::StreamMuxer, transport::{ and_then::AndThen, boxed::boxed, timeout::TransportTimeout, ListenerEvent, Transport, TransportError, diff --git a/core/tests/util.rs b/core/tests/util.rs index 7ca52188a52..664ecb57410 100644 --- a/core/tests/util.rs +++ b/core/tests/util.rs @@ -2,7 +2,7 @@ use futures::prelude::*; use libp2p_core::muxing::StreamMuxer; -use std::{pin::Pin, task::Context, task::Poll}; +use std::{io, pin::Pin, task::Context, task::Poll}; pub struct CloseMuxer { state: CloseMuxerState, @@ -24,9 +24,8 @@ pub enum CloseMuxerState { impl Future for CloseMuxer where M: StreamMuxer, - M::Error: From, { - type Output = Result; + type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index f2a95c21151..cd0fce61f66 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -88,8 +88,6 @@ where { type Substream = Substream; type OutboundSubstream = OutboundSubstream; - type Error = io::Error; - fn poll_event( &self, cx: &mut Context<'_>, diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 91bd27376f7..48cd5003b19 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -35,7 +35,6 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use thiserror::Error; /// A Yamux connection. pub struct Yamux(Mutex>); @@ -95,26 +94,23 @@ where } } -pub type YamuxResult = Result; - /// > **Note**: This implementation never emits [`StreamMuxerEvent::AddressChange`] events. impl StreamMuxer for Yamux where - S: Stream> + Unpin, + S: Stream> + Unpin, { type Substream = yamux::Stream; type OutboundSubstream = OpenSubstreamToken; - type Error = YamuxError; fn poll_event( &self, c: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { let mut inner = self.0.lock(); match ready!(inner.incoming.poll_next_unpin(c)) { Some(Ok(s)) => Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))), - Some(Err(e)) => Poll::Ready(Err(e)), - None => Poll::Ready(Err(yamux::ConnectionError::Closed.into())), + Some(Err(e)) => Poll::Ready(Err(to_io_error(e))), + None => Poll::Ready(Err(to_io_error(yamux::ConnectionError::Closed))), } } @@ -126,11 +122,11 @@ where &self, c: &mut Context<'_>, _: &mut OpenSubstreamToken, - ) -> Poll> { + ) -> Poll> { let mut inner = self.0.lock(); Pin::new(&mut inner.control) .poll_open_stream(c) - .map_err(YamuxError) + .map_err(to_io_error) } fn destroy_outbound(&self, _: Self::OutboundSubstream) { @@ -142,10 +138,8 @@ where c: &mut Context<'_>, s: &mut Self::Substream, b: &mut [u8], - ) -> Poll> { - Pin::new(s) - .poll_read(c, b) - .map_err(|e| YamuxError(e.into())) + ) -> Poll> { + Pin::new(s).poll_read(c, b) } fn write_substream( @@ -153,41 +147,41 @@ where c: &mut Context<'_>, s: &mut Self::Substream, b: &[u8], - ) -> Poll> { - Pin::new(s) - .poll_write(c, b) - .map_err(|e| YamuxError(e.into())) + ) -> Poll> { + Pin::new(s).poll_write(c, b) } fn flush_substream( &self, c: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll> { - Pin::new(s).poll_flush(c).map_err(|e| YamuxError(e.into())) + ) -> Poll> { + Pin::new(s).poll_flush(c) } fn shutdown_substream( &self, c: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll> { - Pin::new(s).poll_close(c).map_err(|e| YamuxError(e.into())) + ) -> Poll> { + Pin::new(s).poll_close(c) } fn destroy_substream(&self, _: Self::Substream) {} - fn poll_close(&self, c: &mut Context<'_>) -> Poll> { + fn poll_close(&self, c: &mut Context<'_>) -> Poll> { let mut inner = self.0.lock(); if let Poll::Ready(()) = Pin::new(&mut inner.control) .poll_close(c) - .map_err(YamuxError)? + .map_err(to_io_error)? { return Poll::Ready(Ok(())); } - while let Poll::Ready(maybe_inbound_stream) = inner.incoming.poll_next_unpin(c)? { + while let Poll::Ready(maybe_inbound_stream) = + inner.incoming.poll_next_unpin(c).map_err(to_io_error)? + { match maybe_inbound_stream { Some(inbound_stream) => mem::drop(inbound_stream), None => return Poll::Ready(Ok(())), @@ -386,23 +380,16 @@ where } } -/// The Yamux [`StreamMuxer`] error type. -#[derive(Debug, Error)] -#[error("yamux error: {0}")] -pub struct YamuxError(#[from] yamux::ConnectionError); - -impl From for io::Error { - fn from(err: YamuxError) -> Self { - match err.0 { - yamux::ConnectionError::Io(e) => e, - e => io::Error::new(io::ErrorKind::Other, e), - } +fn to_io_error(e: yamux::ConnectionError) -> io::Error { + match e { + yamux::ConnectionError::Io(e) => e, + e => io::Error::new(io::ErrorKind::Other, e), } } /// The [`futures::stream::Stream`] of incoming substreams. pub struct Incoming { - stream: BoxStream<'static, Result>, + stream: BoxStream<'static, Result>, _marker: std::marker::PhantomData, } @@ -414,7 +401,7 @@ impl fmt::Debug for Incoming { /// The [`futures::stream::Stream`] of incoming substreams (`!Send`). pub struct LocalIncoming { - stream: LocalBoxStream<'static, Result>, + stream: LocalBoxStream<'static, Result>, _marker: std::marker::PhantomData, } @@ -425,7 +412,7 @@ impl fmt::Debug for LocalIncoming { } impl Stream for Incoming { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.stream.as_mut().poll_next_unpin(cx) @@ -439,7 +426,7 @@ impl Stream for Incoming { impl Unpin for Incoming {} impl Stream for LocalIncoming { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.stream.as_mut().poll_next_unpin(cx) diff --git a/swarm/src/connection/substream.rs b/swarm/src/connection/substream.rs index ba83a67e46a..b7e3d3bad87 100644 --- a/swarm/src/connection/substream.rs +++ b/swarm/src/connection/substream.rs @@ -147,7 +147,7 @@ where Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => { return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr))) } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Pending => {} } @@ -169,7 +169,7 @@ where } Poll::Ready(Err(err)) => { self.inner.destroy_outbound(outbound); - return Poll::Ready(Err(err.into())); + return Poll::Ready(Err(err)); } } } @@ -214,7 +214,7 @@ where match self.muxer.poll_close(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), } } } From 4c39aec1c4ec26b7b5fa2817a1f0c31d51b9eb84 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 19 May 2022 17:28:33 +0200 Subject: [PATCH 2/7] Add newline before return value Visually separating this makes it easier to read. --- muxers/yamux/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 48cd5003b19..fb4bc44ecb8 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -71,6 +71,7 @@ where }, control: ctrl, }; + Yamux(Mutex::new(inner)) } } @@ -90,6 +91,7 @@ where }, control: ctrl, }; + Yamux(Mutex::new(inner)) } } From d44b0a62e05b01002818b5e892681e48a698f78d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 19 May 2022 17:40:29 +0200 Subject: [PATCH 3/7] Re-design the `StreamMuxer` trait The current `StreamMuxer` API is based around a poll-based approach where each substream needs to be passed by value to read/write/close it. The muxer itself however is not available in the end-user abstractions like `ConnectionHandler`. To circumvent this, the `StreamMuxerBox` type exists which allows a `StreamMuxer` to be cloned. Together with `SubstreamRef`, this allows each substream to own a reference to the muxer it was created with and pass itself to the muxer within implementations of `AsyncRead` and `AsyncWrite`. These implementations are convenient as they allow an end-user to simply read and write to the substream without holding a reference to anything else. We can achieve the same goal by changing the `StreamMuxerBox` abstraction to use a `SubstreamBox`. Users of `libp2p-core` can continue to depend on the `Substream` associated type of `StreamMuxer`. --- core/CHANGELOG.md | 3 +- core/src/either.rs | 150 +-------- core/src/muxing.rs | 429 +++++------------------- core/src/muxing/boxed.rs | 286 ++++++---------- core/src/muxing/singleton.rs | 118 ++----- core/src/transport/upgrade.rs | 5 +- core/tests/util.rs | 2 +- muxers/mplex/benches/split_send_size.rs | 32 +- muxers/mplex/src/lib.rs | 153 +++++---- muxers/mplex/tests/async_write.rs | 16 +- muxers/mplex/tests/two_peers.rs | 59 ++-- muxers/yamux/CHANGELOG.md | 3 + muxers/yamux/src/lib.rs | 124 +++---- swarm/src/connection.rs | 49 ++- swarm/src/connection/handler_wrapper.rs | 16 +- swarm/src/connection/pool.rs | 5 +- swarm/src/connection/substream.rs | 257 -------------- swarm/src/lib.rs | 5 +- 18 files changed, 449 insertions(+), 1263 deletions(-) delete mode 100644 swarm/src/connection/substream.rs diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index bebe025d468..da88ce9aa55 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -4,12 +4,13 @@ - Remove `StreamMuxer::flush_all`. See [PR 2669]. - Rename `StreamMuxer::close` to `StreamMuxer::poll_close`. See [PR 2666]. - Remove deprecated function `StreamMuxer::is_remote_acknowledged`. See [PR 2665]. +- Re-design `StreamMuxer` trait to take `&mut self` and remove all associated types but `Substream`. See [PR 2648]. [PR 2529]: https://github.com/libp2p/rust-libp2p/pull/2529 [PR 2666]: https://github.com/libp2p/rust-libp2p/pull/2666 [PR 2665]: https://github.com/libp2p/rust-libp2p/pull/2665 [PR 2669]: https://github.com/libp2p/rust-libp2p/pull/2669 - +[PR 2648]: https://github.com/libp2p/rust-libp2p/pull/2648 # 0.32.1 diff --git a/core/src/either.rs b/core/src/either.rs index 2dd01813f7e..1cbf72b5193 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - muxing::{StreamMuxer, StreamMuxerEvent}, + muxing::{OutboundSubstreamId, StreamMuxer, StreamMuxerEvent}, transport::{ListenerEvent, Transport, TransportError}, Multiaddr, ProtocolName, }; @@ -202,148 +202,29 @@ where B: StreamMuxer, { type Substream = EitherOutput; - type OutboundSubstream = EitherOutbound; - fn poll_event( - &self, + fn poll( + &mut self, cx: &mut Context<'_>, ) -> Poll>> { match self { - EitherOutput::First(inner) => inner.poll_event(cx).map(|result| { - result.map(|event| match event { - StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), - StreamMuxerEvent::InboundSubstream(substream) => { - StreamMuxerEvent::InboundSubstream(EitherOutput::First(substream)) - } - }) - }), - EitherOutput::Second(inner) => inner.poll_event(cx).map(|result| { - result.map(|event| match event { - StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), - StreamMuxerEvent::InboundSubstream(substream) => { - StreamMuxerEvent::InboundSubstream(EitherOutput::Second(substream)) - } - }) - }), - } - } - - fn open_outbound(&self) -> Self::OutboundSubstream { - match self { - EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()), - EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()), - } - } - - fn poll_outbound( - &self, - cx: &mut Context<'_>, - substream: &mut Self::OutboundSubstream, - ) -> Poll> { - match (self, substream) { - (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner - .poll_outbound(cx, substream) - .map(|p| p.map(EitherOutput::First)), - (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner - .poll_outbound(cx, substream) - .map(|p| p.map(EitherOutput::Second)), - _ => panic!("Wrong API usage"), - } - } - - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - match self { - EitherOutput::First(inner) => match substream { - EitherOutbound::A(substream) => inner.destroy_outbound(substream), - _ => panic!("Wrong API usage"), - }, - EitherOutput::Second(inner) => match substream { - EitherOutbound::B(substream) => inner.destroy_outbound(substream), - _ => panic!("Wrong API usage"), - }, - } - } - - fn read_substream( - &self, - cx: &mut Context<'_>, - sub: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll> { - match (self, sub) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.read_substream(cx, sub, buf) - } - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.read_substream(cx, sub, buf) - } - _ => panic!("Wrong API usage"), - } - } - - fn write_substream( - &self, - cx: &mut Context<'_>, - sub: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - match (self, sub) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.write_substream(cx, sub, buf) - } - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.write_substream(cx, sub, buf) - } - _ => panic!("Wrong API usage"), - } - } - - fn flush_substream( - &self, - cx: &mut Context<'_>, - sub: &mut Self::Substream, - ) -> Poll> { - match (self, sub) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.flush_substream(cx, sub) - } - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.flush_substream(cx, sub) + EitherOutput::First(inner) => { + inner.poll(cx).map_ok(|e| e.map_stream(EitherOutput::First)) } - _ => panic!("Wrong API usage"), + EitherOutput::Second(inner) => inner + .poll(cx) + .map_ok(|e| e.map_stream(EitherOutput::Second)), } } - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - sub: &mut Self::Substream, - ) -> Poll> { - match (self, sub) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.shutdown_substream(cx, sub) - } - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.shutdown_substream(cx, sub) - } - _ => panic!("Wrong API usage"), - } - } - - fn destroy_substream(&self, substream: Self::Substream) { + fn open_outbound(&mut self) -> OutboundSubstreamId { match self { - EitherOutput::First(inner) => match substream { - EitherOutput::First(substream) => inner.destroy_substream(substream), - _ => panic!("Wrong API usage"), - }, - EitherOutput::Second(inner) => match substream { - EitherOutput::Second(substream) => inner.destroy_substream(substream), - _ => panic!("Wrong API usage"), - }, + EitherOutput::First(inner) => inner.open_outbound(), + EitherOutput::Second(inner) => inner.open_outbound(), } } - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { match self { EitherOutput::First(inner) => inner.poll_close(cx), EitherOutput::Second(inner) => inner.poll_close(cx), @@ -351,13 +232,6 @@ where } } -#[derive(Debug, Copy, Clone)] -#[must_use = "futures do nothing unless polled"] -pub enum EitherOutbound { - A(A::OutboundSubstream), - B(B::OutboundSubstream), -} - /// Implements `Stream` and dispatches all method calls to either `First` or `Second`. #[pin_project(project = EitherListenStreamProj)] #[derive(Debug, Copy, Clone)] diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 1ae353c5e5a..847a193b832 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -21,8 +21,7 @@ //! Muxing is the process of splitting a connection into multiple substreams. //! //! The main item of this module is the `StreamMuxer` trait. An implementation of `StreamMuxer` -//! has ownership of a connection, lets you open and close substreams, and read/write data -//! on open substreams. +//! has ownership of a connection, lets you open and close substreams. //! //! > **Note**: You normally don't need to use the methods of the `StreamMuxer` directly, as this //! > is managed by the library's internals. @@ -51,163 +50,52 @@ //! The upgrade process will take ownership of the connection, which makes it possible for the //! implementation of `StreamMuxer` to control everything that happens on the wire. -use futures::{future, prelude::*, task::Context, task::Poll}; +use futures::{task::Context, task::Poll}; use multiaddr::Multiaddr; -use std::{fmt, io, ops::Deref, pin::Pin}; +use std::collections::VecDeque; +use std::io; pub use self::boxed::StreamMuxerBox; +pub use self::boxed::SubstreamBox; pub use self::singleton::SingletonMuxer; mod boxed; mod singleton; -/// Implemented on objects that can open and manage substreams. -/// -/// The state of a muxer, as exposed by this API, is the following: -/// -/// - A connection to the remote. The `poll_event`, `flush_all` and `close` methods operate -/// on this. -/// - A list of substreams that are open. The `poll_outbound`, `read_substream`, `write_substream`, -/// `flush_substream`, `shutdown_substream` and `destroy_substream` methods allow controlling -/// these entries. -/// - A list of outbound substreams being opened. The `open_outbound`, `poll_outbound` and -/// `destroy_outbound` methods allow controlling these entries. +/// Provides multiplexing for a connection by allowing users to open substreams. /// +/// A [`StreamMuxer`] is a stateful object and needs to be continuously polled to make progress. pub trait StreamMuxer { /// Type of the object that represents the raw substream where data can be read and written. type Substream; - /// Future that will be resolved when the outgoing substream is open. - type OutboundSubstream; - - /// Polls for a connection-wide event. - /// - /// This function behaves the same as a `Stream`. - /// - /// If `Pending` is returned, then the current task will be notified once the muxer - /// is ready to be polled, similar to the API of `Stream::poll()`. - /// Only the latest task that was used to call this method may be notified. + /// Polls the [`StreamMuxer`] to make progress. /// /// It is permissible and common to use this method to perform background /// work, such as processing incoming packets and polling timers. /// /// An error can be generated if the connection has been closed. - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll>>; - - /// Opens a new outgoing substream, and produces the equivalent to a future that will be - /// resolved when it becomes available. - /// - /// The API of `OutboundSubstream` is totally opaque, and the object can only be interfaced - /// through the methods on the `StreamMuxer` trait. - fn open_outbound(&self) -> Self::OutboundSubstream; - - /// Polls the outbound substream. - /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be polled, similar to the API of `Future::poll()`. - /// However, for each individual outbound substream, only the latest task that was used to - /// call this method may be notified. - /// - /// May panic or produce an undefined result if an earlier polling of the same substream - /// returned `Ready` or `Err`. - fn poll_outbound( - &self, - cx: &mut Context<'_>, - s: &mut Self::OutboundSubstream, - ) -> Poll>; - - /// Destroys an outbound substream future. Use this after the outbound substream has finished, - /// or if you want to interrupt it. - fn destroy_outbound(&self, s: Self::OutboundSubstream); - - /// Reads data from a substream. The behaviour is the same as `futures::AsyncRead::poll_read`. - /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be read. However, for each individual substream, only the latest task that - /// was used to call this method may be notified. - /// - /// If `Async::Ready(0)` is returned, the substream has been closed by the remote and should - /// no longer be read afterwards. - /// - /// An error can be generated if the connection has been closed, or if a protocol misbehaviour - /// happened. - fn read_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll>; - - /// Write data to a substream. The behaviour is the same as `futures::AsyncWrite::poll_write`. - /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be read. For each individual substream, only the latest task that was used to - /// call this method may be notified. - /// - /// Calling `write_substream` does not guarantee that data will arrive to the remote. To - /// ensure that, you should call `flush_substream`. - /// - /// It is incorrect to call this method on a substream if you called `shutdown_substream` on - /// this substream earlier. - fn write_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &[u8], - ) -> Poll>; + fn poll(&mut self, cx: &mut Context<'_>) + -> Poll>>; - /// Flushes a substream. The behaviour is the same as `futures::AsyncWrite::poll_flush`. - /// - /// After this method has been called, data written earlier on the substream is guaranteed to - /// be received by the remote. + /// Instruct this [`StreamMuxer`] to open a new outbound substream. /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be read. For each individual substream, only the latest task that was used to - /// call this method may be notified. - /// - /// > **Note**: This method may be implemented as a call to `flush_all`. - fn flush_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll>; - - /// Attempts to shut down the writing side of a substream. The behaviour is similar to - /// `AsyncWrite::poll_close`. - /// - /// Contrary to `AsyncWrite::poll_close`, shutting down a substream does not imply - /// `flush_substream`. If you want to make sure that the remote is immediately informed about - /// the shutdown, use `flush_substream` or `flush_all`. - /// - /// After this method has been called, you should no longer attempt to write to this substream. - /// - /// An error can be generated if the connection has been closed, or if a protocol misbehaviour - /// happened. - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll>; - - /// Destroys a substream. - fn destroy_substream(&self, s: Self::Substream); + /// This function returns instantly with a new ID but the substream has not been opened at that + /// point! When the substream is ready to be used, you will receive a + /// [`StreamMuxerEvent::OutboundSubstream`] event with the same ID as returned here. + fn open_outbound(&mut self) -> OutboundSubstreamId; - /// Closes this `StreamMuxer`. + /// Closes this [`StreamMuxer`]. /// /// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless. All /// subsequent reads must return either `EOF` or an error. All subsequent writes, shutdowns, /// or polls must generate an error or be ignored. /// - /// Calling this method implies `flush_all`. - /// /// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so /// > that the remote is properly informed of the shutdown. However, apart from /// > properly informing the remote, there is no difference between this and /// > immediately dropping the muxer. - fn poll_close(&self, cx: &mut Context<'_>) -> Poll>; + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll>; } /// Event about a connection, reported by an implementation of [`StreamMuxer`]. @@ -216,6 +104,10 @@ pub enum StreamMuxerEvent { /// Remote has opened a new substream. Contains the substream in question. InboundSubstream(T), + /// We have opened a new substream. Contains the substream and the ID returned + /// from [`StreamMuxer::open_outbound`]. + OutboundSubstream(T, OutboundSubstreamId), + /// Address to the remote has changed. The previous one is now obsolete. /// /// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes @@ -233,253 +125,82 @@ impl StreamMuxerEvent { None } } -} -/// Polls for an event from the muxer and, if an inbound substream, wraps this substream in an -/// object that implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`. -pub fn event_from_ref_and_wrap

( - muxer: P, -) -> impl Future>>> -where - P: Deref + Clone, - P::Target: StreamMuxer, -{ - let muxer2 = muxer.clone(); - future::poll_fn(move |cx| muxer.poll_event(cx)).map_ok(|event| match event { - StreamMuxerEvent::InboundSubstream(substream) => { - StreamMuxerEvent::InboundSubstream(substream_from_ref(muxer2, substream)) + /// If `self` is a [`StreamMuxerEvent::OutboundSubstream`], returns the content. Otherwise + /// returns `None`. + pub fn into_outbound_substream(self) -> Option<(T, OutboundSubstreamId)> { + if let StreamMuxerEvent::OutboundSubstream(s, id) = self { + Some((s, id)) + } else { + None } - StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), - }) -} - -/// Same as `outbound_from_ref`, but wraps the output in an object that -/// implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`. -pub fn outbound_from_ref_and_wrap

(muxer: P) -> OutboundSubstreamRefWrapFuture

-where - P: Deref + Clone, - P::Target: StreamMuxer, -{ - let inner = outbound_from_ref(muxer); - OutboundSubstreamRefWrapFuture { inner } -} - -/// Future returned by `outbound_from_ref_and_wrap`. -pub struct OutboundSubstreamRefWrapFuture

-where - P: Deref + Clone, - P::Target: StreamMuxer, -{ - inner: OutboundSubstreamRefFuture

, -} - -impl

Future for OutboundSubstreamRefWrapFuture

-where - P: Deref + Clone, - P::Target: StreamMuxer, -{ - type Output = io::Result>; + } - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Future::poll(Pin::new(&mut self.inner), cx) { - Poll::Ready(Ok(substream)) => { - let out = substream_from_ref(self.inner.muxer.clone(), substream); - Poll::Ready(Ok(out)) + /// Map the stream within [`StreamMuxerEvent::InboundSubstream`] and + /// [`StreamMuxerEvent::OutboundSubstream`] to a new type. + pub fn map_stream(self, map: impl FnOnce(T) -> O) -> StreamMuxerEvent { + match self { + StreamMuxerEvent::InboundSubstream(stream) => { + StreamMuxerEvent::InboundSubstream(map(stream)) } - Poll::Pending => Poll::Pending, - Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + StreamMuxerEvent::OutboundSubstream(stream, id) => { + StreamMuxerEvent::OutboundSubstream(map(stream), id) + } + StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), } } } -/// Builds a new future for an outbound substream, where the muxer is a reference. -pub fn outbound_from_ref

(muxer: P) -> OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ - let outbound = muxer.open_outbound(); - OutboundSubstreamRefFuture { - muxer, - outbound: Some(outbound), - } -} - -/// Future returned by `outbound_from_ref`. -pub struct OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ - muxer: P, - outbound: Option<::OutboundSubstream>, -} - -impl

Unpin for OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ -} - -impl

Future for OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ - type Output = io::Result<::Substream>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; - this.muxer - .poll_outbound(cx, this.outbound.as_mut().expect("outbound was empty")) - } -} - -impl

Drop for OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ - fn drop(&mut self) { - self.muxer - .destroy_outbound(self.outbound.take().expect("outbound was empty")) - } -} +#[derive(Default, Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Hash)] +pub struct OutboundSubstreamId(u64); -/// Builds an implementation of `Read`/`Write`/`AsyncRead`/`AsyncWrite` from an `Arc` to the -/// muxer and a substream. -pub fn substream_from_ref

( - muxer: P, - substream: ::Substream, -) -> SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - SubstreamRef { - muxer, - substream: Some(substream), - shutdown_state: ShutdownState::Shutdown, - } -} +impl OutboundSubstreamId { + pub(crate) const ZERO: OutboundSubstreamId = OutboundSubstreamId(0); -/// Stream returned by `substream_from_ref`. -pub struct SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - muxer: P, - substream: Option<::Substream>, - shutdown_state: ShutdownState, -} + fn fetch_add(&mut self) -> Self { + let next_id = Self(self.0); -enum ShutdownState { - Shutdown, - Flush, - Done, -} + self.0 += 1; -impl

fmt::Debug for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, - ::Substream: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - write!(f, "Substream({:?})", self.substream) + next_id } } -impl

Unpin for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ +/// Utility state machine for muxers to simplify the generation of new [`OutboundSubstreamId`]s +/// and the state handling around when new outbound substreams should be opened. +#[derive(Default)] +pub struct OutboundSubstreams { + current_id: OutboundSubstreamId, + substreams_to_open: VecDeque, } -impl

AsyncRead for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; +impl OutboundSubstreams { + /// Instruct this state machine to open a new outbound substream. + pub fn open_new(&mut self) -> OutboundSubstreamId { + let next_id = self.current_id.fetch_add(); + self.substreams_to_open.push_back(next_id); - let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.read_substream(cx, s, buf) + next_id } -} -impl

AsyncWrite for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; - - let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.write_substream(cx, s, buf) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; - - let s = this.substream.as_mut().expect("substream was empty"); - loop { - match this.shutdown_state { - ShutdownState::Shutdown => match this.muxer.shutdown_substream(cx, s) { - Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Flush, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => return Poll::Pending, - }, - ShutdownState::Flush => match this.muxer.flush_substream(cx, s) { - Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Done, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => return Poll::Pending, - }, - ShutdownState::Done => { - return Poll::Ready(Ok(())); - } - } + /// Allow this state machine to make progress. + /// + /// In case a new substream was requested, we invoke the provided poll function and return the + /// created substream along with its ID. + pub fn poll( + &mut self, + poll_open: impl FnOnce() -> Poll>, + ) -> Poll> { + if self.substreams_to_open.is_empty() { + return Poll::Pending; } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; - let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.flush_substream(cx, s) - } -} + let stream = futures::ready!(poll_open())?; + let id = self + .substreams_to_open + .pop_front() + .expect("we checked that we are not empty"); -impl

Drop for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - fn drop(&mut self) { - self.muxer - .destroy_substream(self.substream.take().expect("substream was empty")) + Poll::Ready(Ok((stream, id))) } } diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index dee654aa540..a95331e62b1 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -1,242 +1,164 @@ -use crate::muxing::StreamMuxerEvent; +use crate::muxing::{OutboundSubstreamId, StreamMuxerEvent}; use crate::StreamMuxer; -use fnv::FnvHashMap; -use parking_lot::Mutex; -use std::io; -use std::sync::atomic::{AtomicUsize, Ordering}; +use futures::{AsyncRead, AsyncWrite, Stream}; +use std::io::{IoSlice, IoSliceMut}; +use std::pin::Pin; use std::task::{Context, Poll}; +use std::{fmt, io}; -/// Abstract `StreamMuxer`. +/// Abstract [`StreamMuxer`] with [`SubstreamBox`] as its `Substream` type. pub struct StreamMuxerBox { - inner: Box + Send + Sync>, + inner: Box + Send + 'static>, } -struct Wrap -where - T: StreamMuxer, -{ +/// Abstract type for asynchronous reading and writing. +/// +/// A [`SubstreamBox`] erases the concrete type it is given and only retains its `AsyncRead` +/// and `AsyncWrite` capabilities. +pub struct SubstreamBox(Box); + +/// Helper type to abstract away the concrete [`Substream`](StreamMuxer::Substream) type of a [`StreamMuxer`]. +/// +/// Within this implementation, we map the concrete [`Substream`](StreamMuxer::Substream) to a [`SubstreamBox`]. +/// By parameterizing the entire [`Wrap`] type, we can stuff it into a [`Box`] and erase the concrete type. +struct Wrap { inner: T, - substreams: Mutex>, - next_substream: AtomicUsize, - outbound: Mutex>, - next_outbound: AtomicUsize, } -impl StreamMuxer for Wrap +impl StreamMuxer for Wrap where - T: StreamMuxer, + T: StreamMuxer, + S: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - type Substream = usize; // TODO: use a newtype - type OutboundSubstream = usize; // TODO: use a newtype + type Substream = SubstreamBox; - #[inline] - fn poll_event( - &self, + fn poll( + &mut self, cx: &mut Context<'_>, ) -> Poll>> { - let substream = match self.inner.poll_event(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) => { - return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) - } - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - }; - - let id = self.next_substream.fetch_add(1, Ordering::Relaxed); - self.substreams.lock().insert(id, substream); - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(id))) - } - - #[inline] - fn open_outbound(&self) -> Self::OutboundSubstream { - let outbound = self.inner.open_outbound(); - let id = self.next_outbound.fetch_add(1, Ordering::Relaxed); - self.outbound.lock().insert(id, outbound); - id - } - - #[inline] - fn poll_outbound( - &self, - cx: &mut Context<'_>, - substream: &mut Self::OutboundSubstream, - ) -> Poll> { - let mut list = self.outbound.lock(); - let substream = match self - .inner - .poll_outbound(cx, list.get_mut(substream).unwrap()) - { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(s)) => s, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - }; - let id = self.next_substream.fetch_add(1, Ordering::Relaxed); - self.substreams.lock().insert(id, substream); - Poll::Ready(Ok(id)) - } - - #[inline] - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - let mut list = self.outbound.lock(); - self.inner - .destroy_outbound(list.remove(&substream).unwrap()) - } - - #[inline] - fn read_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner.read_substream(cx, list.get_mut(s).unwrap(), buf) - } + let event = futures::ready!(self.inner.poll(cx))?.map_stream(SubstreamBox::new); - #[inline] - fn write_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner - .write_substream(cx, list.get_mut(s).unwrap(), buf) + Poll::Ready(Ok(event)) } - #[inline] - fn flush_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner.flush_substream(cx, list.get_mut(s).unwrap()) - } - - #[inline] - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner.shutdown_substream(cx, list.get_mut(s).unwrap()) - } - - #[inline] - fn destroy_substream(&self, substream: Self::Substream) { - let mut list = self.substreams.lock(); - self.inner - .destroy_substream(list.remove(&substream).unwrap()) + fn open_outbound(&mut self) -> OutboundSubstreamId { + self.inner.open_outbound() } - #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_close(cx) } } impl StreamMuxerBox { - /// Turns a stream muxer into a `StreamMuxerBox`. - pub fn new(muxer: T) -> StreamMuxerBox + /// Construct a new [`StreamMuxerBox`]. + pub fn new(muxer: impl StreamMuxer + Send + 'static) -> Self where - T: StreamMuxer + Send + Sync + 'static, - T::OutboundSubstream: Send, - T::Substream: Send, + S: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - let wrap = Wrap { - inner: muxer, - substreams: Mutex::new(Default::default()), - next_substream: AtomicUsize::new(0), - outbound: Mutex::new(Default::default()), - next_outbound: AtomicUsize::new(0), - }; - - StreamMuxerBox { - inner: Box::new(wrap), + Self { + inner: Box::new(Wrap { inner: muxer }), } } } impl StreamMuxer for StreamMuxerBox { - type Substream = usize; // TODO: use a newtype - type OutboundSubstream = usize; // TODO: use a newtype + type Substream = SubstreamBox; - #[inline] - fn poll_event( - &self, + fn poll( + &mut self, cx: &mut Context<'_>, ) -> Poll>> { - self.inner.poll_event(cx) + self.inner.poll(cx) } - #[inline] - fn open_outbound(&self) -> Self::OutboundSubstream { + fn open_outbound(&mut self) -> OutboundSubstreamId { self.inner.open_outbound() } - #[inline] - fn poll_outbound( - &self, - cx: &mut Context<'_>, - s: &mut Self::OutboundSubstream, - ) -> Poll> { - self.inner.poll_outbound(cx, s) + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx) } +} + +impl Stream for StreamMuxerBox { + type Item = io::Result::Substream>>; - #[inline] - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - self.inner.destroy_outbound(substream) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().poll(cx).map(Some) } +} + +impl SubstreamBox { + /// Construct a new [`SubstreamBox`] from something that implements [`AsyncRead`] and [`AsyncWrite`]. + pub fn new(stream: S) -> Self { + Self(Box::new(stream)) + } +} + +impl fmt::Debug for SubstreamBox { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SubstreamBox({})", self.0.type_name()) + } +} + +/// Workaround because Rust does not allow `Box`. +trait AsyncReadWrite: AsyncRead + AsyncWrite + Unpin { + /// Helper function to capture the erased inner type. + /// + /// Used to make the `Debug` implementation of `SubstreamBox` more useful. + fn type_name(&self) -> &'static str; +} - #[inline] - fn read_substream( - &self, +impl AsyncReadWrite for S +where + S: AsyncRead + AsyncWrite + Unpin, +{ + fn type_name(&self) -> &'static str { + std::any::type_name::() + } +} + +impl AsyncRead for SubstreamBox { + fn poll_read( + self: Pin<&mut Self>, cx: &mut Context<'_>, - s: &mut Self::Substream, buf: &mut [u8], - ) -> Poll> { - self.inner.read_substream(cx, s, buf) + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_read(cx, buf) } - #[inline] - fn write_substream( - &self, + fn poll_read_vectored( + self: Pin<&mut Self>, cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - self.inner.write_substream(cx, s, buf) + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_read_vectored(cx, bufs) } +} - #[inline] - fn flush_substream( - &self, +impl AsyncWrite for SubstreamBox { + fn poll_write( + self: Pin<&mut Self>, cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - self.inner.flush_substream(cx, s) + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_write(cx, buf) } - #[inline] - fn shutdown_substream( - &self, + fn poll_write_vectored( + self: Pin<&mut Self>, cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - self.inner.shutdown_substream(cx, s) + bufs: &[IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_write_vectored(cx, bufs) } - #[inline] - fn destroy_substream(&self, s: Self::Substream) { - self.inner.destroy_substream(s) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_flush(cx) } - #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_close(cx) } } diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index d52915cf7c6..522f6042f46 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -18,20 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::muxing::OutboundSubstreamId; use crate::{ connection::Endpoint, muxing::{StreamMuxer, StreamMuxerEvent}, }; use futures::prelude::*; -use parking_lot::Mutex; -use std::{ - io, - pin::Pin, - sync::atomic::{AtomicBool, Ordering}, - task::Context, - task::Poll, -}; +use std::{io, task::Context, task::Poll}; /// Implementation of `StreamMuxer` that allows only one substream on top of a connection, /// yielding the connection itself. @@ -40,9 +34,7 @@ use std::{ /// Most notably, no protocol is negotiated. pub struct SingletonMuxer { /// The inner connection. - inner: Mutex, - /// If true, a substream has been produced and any further attempt should fail. - substream_extracted: AtomicBool, + inner: Option, /// Our local endpoint. Always the same value as was passed to `new`. endpoint: Endpoint, } @@ -54,101 +46,39 @@ impl SingletonMuxer { /// If `endpoint` is `Listener`, then only one inbound substream will be permitted. pub fn new(inner: TSocket, endpoint: Endpoint) -> Self { SingletonMuxer { - inner: Mutex::new(inner), - substream_extracted: AtomicBool::new(false), + inner: Some(inner), endpoint, } } } -/// Substream of the `SingletonMuxer`. -pub struct Substream {} -/// Outbound substream attempt of the `SingletonMuxer`. -pub struct OutboundSubstream {} - impl StreamMuxer for SingletonMuxer where - TSocket: AsyncRead + AsyncWrite + Unpin, + TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - type Substream = Substream; - type OutboundSubstream = OutboundSubstream; - - fn poll_event( - &self, - _: &mut Context<'_>, - ) -> Poll>> { - match self.endpoint { - Endpoint::Dialer => return Poll::Pending, - Endpoint::Listener => {} - } - - if !self.substream_extracted.swap(true, Ordering::Relaxed) { - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(Substream {}))) - } else { - Poll::Pending - } + type Substream = TSocket; + + fn poll(&mut self, _: &mut Context<'_>) -> Poll>> { + let stream = match self.inner.take() { + None => return Poll::Pending, + Some(stream) => stream, + }; + + let event = match self.endpoint { + Endpoint::Dialer => { + StreamMuxerEvent::OutboundSubstream(stream, OutboundSubstreamId::ZERO) + } + Endpoint::Listener => StreamMuxerEvent::InboundSubstream(stream), + }; + + Poll::Ready(Ok(event)) } - fn open_outbound(&self) -> Self::OutboundSubstream { - OutboundSubstream {} - } - - fn poll_outbound( - &self, - _: &mut Context<'_>, - _: &mut Self::OutboundSubstream, - ) -> Poll> { - match self.endpoint { - Endpoint::Listener => return Poll::Pending, - Endpoint::Dialer => {} - } - - if !self.substream_extracted.swap(true, Ordering::Relaxed) { - Poll::Ready(Ok(Substream {})) - } else { - Poll::Pending - } + fn open_outbound(&mut self) -> OutboundSubstreamId { + OutboundSubstreamId::ZERO } - fn destroy_outbound(&self, _: Self::OutboundSubstream) {} - - fn read_substream( - &self, - cx: &mut Context<'_>, - _: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll> { - AsyncRead::poll_read(Pin::new(&mut *self.inner.lock()), cx, buf) - } - - fn write_substream( - &self, - cx: &mut Context<'_>, - _: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - AsyncWrite::poll_write(Pin::new(&mut *self.inner.lock()), cx, buf) - } - - fn flush_substream( - &self, - cx: &mut Context<'_>, - _: &mut Self::Substream, - ) -> Poll> { - AsyncWrite::poll_flush(Pin::new(&mut *self.inner.lock()), cx) - } - - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - _: &mut Self::Substream, - ) -> Poll> { - AsyncWrite::poll_close(Pin::new(&mut *self.inner.lock()), cx) - } - - fn destroy_substream(&self, _: Self::Substream) {} - - fn poll_close(&self, _cx: &mut Context<'_>) -> Poll> { + fn poll_close(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 3930adbfd17..6cd1189ed1d 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -300,9 +300,8 @@ impl Multiplexed { T::Listener: Send + 'static, T::ListenerUpgrade: Send + 'static, T::Error: Send + Sync, - M: StreamMuxer + Send + Sync + 'static, - M::Substream: Send + 'static, - M::OutboundSubstream: Send + 'static, + M: StreamMuxer + Send + 'static, + M::Substream: AsyncRead + AsyncWrite + Send + Unpin + 'static, { boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m)))) } diff --git a/core/tests/util.rs b/core/tests/util.rs index 664ecb57410..1bb33f63bd4 100644 --- a/core/tests/util.rs +++ b/core/tests/util.rs @@ -30,7 +30,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match std::mem::replace(&mut self.state, CloseMuxerState::Done) { - CloseMuxerState::Close(muxer) => { + CloseMuxerState::Close(mut muxer) => { if !muxer.poll_close(cx)?.is_ready() { self.state = CloseMuxerState::Close(muxer); return Poll::Pending; diff --git a/muxers/mplex/benches/split_send_size.rs b/muxers/mplex/benches/split_send_size.rs index ff52177529d..692d8d75cfa 100644 --- a/muxers/mplex/benches/split_send_size.rs +++ b/muxers/mplex/benches/split_send_size.rs @@ -103,8 +103,8 @@ fn run(transport: &mut BenchTransport, payload: &Vec, listen_addr: &Multiadd addr_sender.take().unwrap().send(a).unwrap(); } transport::ListenerEvent::Upgrade { upgrade, .. } => { - let (_peer, conn) = upgrade.await.unwrap(); - let mut s = poll_fn(|cx| conn.poll_event(cx)) + let (_peer, mut conn) = upgrade.await.unwrap(); + let mut s = poll_fn(|cx| conn.poll(cx)) .await .expect("unexpected error") .into_inbound_substream() @@ -115,9 +115,7 @@ fn run(transport: &mut BenchTransport, payload: &Vec, listen_addr: &Multiadd loop { // Read in typical chunk sizes of up to 8KiB. let end = off + std::cmp::min(buf.len() - off, 8 * 1024); - let n = poll_fn(|cx| conn.read_substream(cx, &mut s, &mut buf[off..end])) - .await - .unwrap(); + let n = s.read(&mut buf[off..end]).await.unwrap(); off += n; if off == buf.len() { return; @@ -132,24 +130,16 @@ fn run(transport: &mut BenchTransport, payload: &Vec, listen_addr: &Multiadd // Spawn and block on the sender, i.e. until all data is sent. task::block_on(async move { let addr = addr_receiver.await.unwrap(); - let (_peer, conn) = transport.dial(addr).unwrap().await.unwrap(); - let mut handle = conn.open_outbound(); - let mut stream = poll_fn(|cx| conn.poll_outbound(cx, &mut handle)) + let (_peer, mut conn) = transport.dial(addr).unwrap().await.unwrap(); + conn.open_outbound(); + let (mut stream, _) = poll_fn(|cx| conn.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); - let mut off = 0; - loop { - let n = poll_fn(|cx| conn.write_substream(cx, &mut stream, &payload[off..])) - .await - .unwrap(); - off += n; - if off == payload.len() { - poll_fn(|cx| conn.flush_substream(cx, &mut stream)) - .await - .unwrap(); - return; - } - } + + stream.write_all(&payload).await.unwrap(); + stream.flush().await.unwrap(); }); // Wait for all data to be received. diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index cd0fce61f66..50d9c812ce0 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -28,12 +28,14 @@ use bytes::Bytes; use codec::LocalStreamId; use futures::{future, prelude::*, ready}; use libp2p_core::{ + muxing::OutboundSubstreamId, + muxing::OutboundSubstreams, muxing::StreamMuxerEvent, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, StreamMuxer, }; use parking_lot::Mutex; -use std::{cmp, iter, task::Context, task::Poll}; +use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll}; impl UpgradeInfo for MplexConfig { type Info = &'static [u8]; @@ -54,7 +56,8 @@ where fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { future::ready(Ok(Multiplex { - io: Mutex::new(io::Multiplexed::new(socket, self)), + io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))), + outbound_substreams: OutboundSubstreams::default(), })) } } @@ -69,7 +72,8 @@ where fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { future::ready(Ok(Multiplex { - io: Mutex::new(io::Multiplexed::new(socket, self)), + io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))), + outbound_substreams: OutboundSubstreams::default(), })) } } @@ -79,115 +83,142 @@ where /// This implementation isn't capable of detecting when the underlying socket changes its address, /// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted. pub struct Multiplex { - io: Mutex>, + io: Arc>>, + outbound_substreams: OutboundSubstreams, } impl StreamMuxer for Multiplex where C: AsyncRead + AsyncWrite + Unpin, { - type Substream = Substream; - type OutboundSubstream = OutboundSubstream; - fn poll_event( - &self, + type Substream = Substream; + + fn poll( + &mut self, cx: &mut Context<'_>, ) -> Poll>> { - let stream_id = ready!(self.io.lock().poll_next_stream(cx))?; - let stream = Substream::new(stream_id); - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream))) - } + if let Poll::Ready((mplex_id, outbound_id)) = self + .outbound_substreams + .poll(|| Pin::new(&mut self.io.lock()).poll_open_stream(cx))? + { + let substream = Substream::new(mplex_id, self.io.clone()); + + return Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream( + substream, + outbound_id, + ))); + } + + if let Poll::Ready(mplex_id) = self.io.lock().poll_next_stream(cx)? { + let substream = Substream::new(mplex_id, self.io.clone()); + + return Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))); + } - fn open_outbound(&self) -> Self::OutboundSubstream { - OutboundSubstream {} + Poll::Pending } - fn poll_outbound( - &self, - cx: &mut Context<'_>, - _: &mut Self::OutboundSubstream, - ) -> Poll> { - let stream_id = ready!(self.io.lock().poll_open_stream(cx))?; - Poll::Ready(Ok(Substream::new(stream_id))) + fn open_outbound(&mut self) -> OutboundSubstreamId { + self.outbound_substreams.open_new() } - fn destroy_outbound(&self, _substream: Self::OutboundSubstream) { - // Nothing to do, since `open_outbound` creates no new local state. + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + self.io.lock().poll_close(cx) } +} - fn read_substream( - &self, +impl AsyncRead for Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, cx: &mut Context<'_>, - substream: &mut Self::Substream, buf: &mut [u8], - ) -> Poll> { + ) -> Poll> { + let this = self.get_mut(); + loop { // Try to read from the current (i.e. last received) frame. - if !substream.current_data.is_empty() { - let len = cmp::min(substream.current_data.len(), buf.len()); - buf[..len].copy_from_slice(&substream.current_data.split_to(len)); + if !this.current_data.is_empty() { + let len = cmp::min(this.current_data.len(), buf.len()); + buf[..len].copy_from_slice(&this.current_data.split_to(len)); return Poll::Ready(Ok(len)); } // Read the next data frame from the multiplexed stream. - match ready!(self.io.lock().poll_read_stream(cx, substream.id))? { + match ready!(this.io.lock().poll_read_stream(cx, this.id))? { Some(data) => { - substream.current_data = data; + this.current_data = data; } None => return Poll::Ready(Ok(0)), } } } +} - fn write_substream( - &self, +impl AsyncWrite for Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ + fn poll_write( + self: Pin<&mut Self>, cx: &mut Context<'_>, - substream: &mut Self::Substream, buf: &[u8], - ) -> Poll> { - self.io.lock().poll_write_stream(cx, substream.id, buf) - } + ) -> Poll> { + let this = self.get_mut(); - fn flush_substream( - &self, - cx: &mut Context<'_>, - substream: &mut Self::Substream, - ) -> Poll> { - self.io.lock().poll_flush_stream(cx, substream.id) + this.io.lock().poll_write_stream(cx, this.id, buf) } - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - substream: &mut Self::Substream, - ) -> Poll> { - self.io.lock().poll_close_stream(cx, substream.id) - } + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); - fn destroy_substream(&self, sub: Self::Substream) { - self.io.lock().drop_stream(sub.id); + this.io.lock().poll_flush_stream(cx, this.id) } - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.io.lock().poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let mut io = this.io.lock(); + + ready!(io.poll_close_stream(cx, this.id))?; + ready!(io.poll_flush_stream(cx, this.id))?; + + Poll::Ready(Ok(())) } } -/// Active attempt to open an outbound substream. -pub struct OutboundSubstream {} - /// Active substream to the remote. -pub struct Substream { +pub struct Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ /// The unique, local identifier of the substream. id: LocalStreamId, /// The current data frame the substream is reading from. current_data: Bytes, + /// Shared reference to the actual muxer. + io: Arc>>, } -impl Substream { - fn new(id: LocalStreamId) -> Self { +impl Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ + fn new(id: LocalStreamId, io: Arc>>) -> Self { Self { id, current_data: Bytes::new(), + io, } } } + +impl Drop for Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ + fn drop(&mut self) { + self.io.lock().drop_stream(self.id); + } +} diff --git a/muxers/mplex/tests/async_write.rs b/muxers/mplex/tests/async_write.rs index 96b608a68ad..d359d99947d 100644 --- a/muxers/mplex/tests/async_write.rs +++ b/muxers/mplex/tests/async_write.rs @@ -18,10 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::{channel::oneshot, prelude::*}; -use libp2p_core::{muxing, upgrade, Transport}; +use futures::{channel::oneshot, future::poll_fn, prelude::*}; +use libp2p_core::{upgrade, StreamMuxer, Transport}; use libp2p_tcp::TcpConfig; -use std::sync::Arc; #[test] fn async_write() { @@ -49,7 +48,7 @@ fn async_write() { tx.send(addr).unwrap(); - let client = listener + let mut client = listener .next() .await .unwrap() @@ -60,8 +59,11 @@ fn async_write() { .await .unwrap(); - let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)) + client.open_outbound(); + let (mut outbound, _) = poll_fn(|cx| client.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); let mut buf = Vec::new(); @@ -74,9 +76,9 @@ fn async_write() { let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); - let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); + let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); let mut inbound = loop { - if let Some(s) = muxing::event_from_ref_and_wrap(client.clone()) + if let Some(s) = poll_fn(|cx| client.poll(cx)) .await .unwrap() .into_inbound_substream() diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index 77e1a09997b..b30e23d6403 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -18,10 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::{channel::oneshot, prelude::*}; -use libp2p_core::{muxing, upgrade, Transport}; +use futures::{channel::oneshot, future::poll_fn, prelude::*}; +use libp2p_core::{upgrade, StreamMuxer, Transport}; use libp2p_tcp::TcpConfig; -use std::sync::Arc; #[test] fn client_to_server_outbound() { @@ -49,7 +48,7 @@ fn client_to_server_outbound() { tx.send(addr).unwrap(); - let client = listener + let mut client = listener .next() .await .unwrap() @@ -60,12 +59,15 @@ fn client_to_server_outbound() { .await .unwrap(); - let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)) + client.open_outbound(); + let (mut outbound, _) = poll_fn(|cx| client.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); - let mut buf = Vec::new(); - outbound.read_to_end(&mut buf).await.unwrap(); + let mut buf = vec![0u8; 11]; + outbound.read_exact(&mut buf).await.unwrap(); assert_eq!(buf, b"hello world"); }); @@ -74,9 +76,9 @@ fn client_to_server_outbound() { let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); - let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); + let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); let mut inbound = loop { - if let Some(s) = muxing::event_from_ref_and_wrap(client.clone()) + if let Some(s) = poll_fn(|cx| client.poll(cx)) .await .unwrap() .into_inbound_substream() @@ -117,21 +119,19 @@ fn client_to_server_inbound() { tx.send(addr).unwrap(); - let client = Arc::new( - listener - .next() - .await - .unwrap() - .unwrap() - .into_upgrade() - .unwrap() - .0 - .await - .unwrap(), - ); + let mut client = listener + .next() + .await + .unwrap() + .unwrap() + .into_upgrade() + .unwrap() + .0 + .await + .unwrap(); let mut inbound = loop { - if let Some(s) = muxing::event_from_ref_and_wrap(client.clone()) + if let Some(s) = poll_fn(|cx| client.poll(cx)) .await .unwrap() .into_inbound_substream() @@ -150,9 +150,13 @@ fn client_to_server_inbound() { let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); - let client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); - let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)) + let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); + + client.open_outbound(); + let (mut outbound, _) = poll_fn(|cx| client.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); outbound.write_all(b"hello world").await.unwrap(); outbound.close().await.unwrap(); @@ -185,7 +189,7 @@ fn protocol_not_match() { tx.send(addr).unwrap(); - let client = listener + let mut client = listener .next() .await .unwrap() @@ -196,8 +200,11 @@ fn protocol_not_match() { .await .unwrap(); - let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)) + client.open_outbound(); + let (mut outbound, _) = poll_fn(|cx| client.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); let mut buf = Vec::new(); diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index de739b1b965..62047deebc9 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -1,6 +1,9 @@ # 0.37.0 - Update to `libp2p-core` `v0.33.0`. +- Remove `YamuxResult` and `YamuxError` from the public API. See [PR 2648]. + +[PR 2648]: https://github.com/libp2p/rust-libp2p/pull/2648 # 0.36.0 [2022-02-22] diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index fb4bc44ecb8..83fac210758 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -24,12 +24,10 @@ use futures::{ future, prelude::*, - ready, stream::{BoxStream, LocalBoxStream}, }; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; +use libp2p_core::muxing::{OutboundSubstreamId, OutboundSubstreams, StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use parking_lot::Mutex; use std::{ fmt, io, iter, mem, pin::Pin, @@ -37,7 +35,14 @@ use std::{ }; /// A Yamux connection. -pub struct Yamux(Mutex>); +pub struct Yamux { + /// The [`futures::stream::Stream`] of incoming substreams. + incoming: S, + /// Handle to control the connection. + control: yamux::Control, + /// Controls the state handling around opening new outbound substreams. + outbound_substreams: OutboundSubstreams, +} impl fmt::Debug for Yamux { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -45,13 +50,6 @@ impl fmt::Debug for Yamux { } } -struct Inner { - /// The [`futures::stream::Stream`] of incoming substreams. - incoming: S, - /// Handle to control the connection. - control: yamux::Control, -} - /// A token to poll for an outbound substream. #[derive(Debug)] pub struct OpenSubstreamToken(()); @@ -64,15 +62,15 @@ where fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { let conn = yamux::Connection::new(io, cfg, mode); let ctrl = conn.control(); - let inner = Inner { + + Yamux { incoming: Incoming { stream: yamux::into_stream(conn).err_into().boxed(), _marker: std::marker::PhantomData, }, control: ctrl, - }; - - Yamux(Mutex::new(inner)) + outbound_substreams: OutboundSubstreams::default(), + } } } @@ -84,15 +82,15 @@ where fn local(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { let conn = yamux::Connection::new(io, cfg, mode); let ctrl = conn.control(); - let inner = Inner { + + Yamux { incoming: LocalIncoming { stream: yamux::into_stream(conn).err_into().boxed_local(), _marker: std::marker::PhantomData, }, control: ctrl, - }; - - Yamux(Mutex::new(inner)) + outbound_substreams: OutboundSubstreams::default(), + } } } @@ -102,79 +100,35 @@ where S: Stream> + Unpin, { type Substream = yamux::Stream; - type OutboundSubstream = OpenSubstreamToken; - - fn poll_event( - &self, - c: &mut Context<'_>, - ) -> Poll>> { - let mut inner = self.0.lock(); - match ready!(inner.incoming.poll_next_unpin(c)) { - Some(Ok(s)) => Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))), - Some(Err(e)) => Poll::Ready(Err(to_io_error(e))), - None => Poll::Ready(Err(to_io_error(yamux::ConnectionError::Closed))), - } - } - - fn open_outbound(&self) -> Self::OutboundSubstream { - OpenSubstreamToken(()) - } - fn poll_outbound( - &self, - c: &mut Context<'_>, - _: &mut OpenSubstreamToken, - ) -> Poll> { - let mut inner = self.0.lock(); - Pin::new(&mut inner.control) - .poll_open_stream(c) - .map_err(to_io_error) - } - - fn destroy_outbound(&self, _: Self::OutboundSubstream) { - self.0.lock().control.abort_open_stream() - } + fn poll(&mut self, c: &mut Context<'_>) -> Poll>> { + if let Poll::Ready((stream, id)) = self + .outbound_substreams + .poll(|| Pin::new(&mut self.control).poll_open_stream(c)) + .map_err(to_io_error)? + { + return Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(stream, id))); + } - fn read_substream( - &self, - c: &mut Context<'_>, - s: &mut Self::Substream, - b: &mut [u8], - ) -> Poll> { - Pin::new(s).poll_read(c, b) - } + if let Poll::Ready(next) = self.incoming.poll_next_unpin(c) { + let event = next + .transpose() + .map_err(to_io_error)? + .map(StreamMuxerEvent::InboundSubstream) + .ok_or_else(|| to_io_error(yamux::ConnectionError::Closed))?; - fn write_substream( - &self, - c: &mut Context<'_>, - s: &mut Self::Substream, - b: &[u8], - ) -> Poll> { - Pin::new(s).poll_write(c, b) - } + return Poll::Ready(Ok(event)); + } - fn flush_substream( - &self, - c: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - Pin::new(s).poll_flush(c) + Poll::Pending } - fn shutdown_substream( - &self, - c: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - Pin::new(s).poll_close(c) + fn open_outbound(&mut self) -> OutboundSubstreamId { + self.outbound_substreams.open_new() } - fn destroy_substream(&self, _: Self::Substream) {} - - fn poll_close(&self, c: &mut Context<'_>) -> Poll> { - let mut inner = self.0.lock(); - - if let Poll::Ready(()) = Pin::new(&mut inner.control) + fn poll_close(&mut self, c: &mut Context<'_>) -> Poll> { + if let Poll::Ready(()) = Pin::new(&mut self.control) .poll_close(c) .map_err(to_io_error)? { @@ -182,7 +136,7 @@ where } while let Poll::Ready(maybe_inbound_stream) = - inner.incoming.poll_next_unpin(c).map_err(to_io_error)? + self.incoming.poll_next_unpin(c).map_err(to_io_error)? { match maybe_inbound_stream { Some(inbound_stream) => mem::drop(inbound_stream), diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index f55f0367b8c..90eb372f370 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -21,7 +21,6 @@ mod error; mod handler_wrapper; mod listeners; -mod substream; pub(crate) mod pool; @@ -32,17 +31,18 @@ pub use error::{ pub use listeners::{ListenersEvent, ListenersStream}; pub use pool::{ConnectionCounters, ConnectionLimits}; pub use pool::{EstablishedConnection, PendingConnection}; -pub use substream::{Close, Substream, SubstreamEndpoint}; use crate::handler::ConnectionHandler; +use futures::future::poll_fn; use handler_wrapper::HandlerWrapper; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::StreamMuxerBox; -use libp2p_core::upgrade; +use libp2p_core::muxing::{OutboundSubstreamId, StreamMuxerBox, StreamMuxerEvent}; use libp2p_core::PeerId; -use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll}; -use substream::{Muxing, SubstreamEvent}; +use libp2p_core::{upgrade, StreamMuxer}; +use std::collections::HashMap; +use std::future::Future; +use std::{error::Error, fmt, io, pin::Pin, task::Context, task::Poll}; /// Information about a successfully established connection. #[derive(Debug, Clone, PartialEq, Eq)] @@ -68,7 +68,11 @@ where THandler: ConnectionHandler, { /// Node that handles the muxing. - muxing: substream::Muxing>, + muxing: StreamMuxerBox, + + /// Temporary storage of OutboundOpenInfo whilst the substreams are opening. + open_info: HashMap>, + /// Handler that processes substreams. handler: HandlerWrapper, } @@ -76,10 +80,11 @@ where impl fmt::Debug for Connection where THandler: ConnectionHandler + fmt::Debug, + handler_wrapper::OutboundOpenInfo: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection") - .field("muxing", &self.muxing) + .field("open_info", &self.open_info) .field("handler", &self.handler) .finish() } @@ -105,7 +110,8 @@ where max_negotiating_inbound_streams, ); Connection { - muxing: Muxing::new(muxer), + muxing: muxer, + open_info: HashMap::default(), handler: wrapped_handler, } } @@ -117,10 +123,10 @@ where /// Begins an orderly shutdown of the connection, returning the connection /// handler and a `Future` that resolves when connection shutdown is complete. - pub fn close(self) -> (THandler, Close) { + pub fn close(mut self) -> (THandler, impl Future>) { ( self.handler.into_connection_handler(), - self.muxing.close().0, + poll_fn(move |cx| self.muxing.poll_close(cx)), ) } @@ -135,7 +141,8 @@ where match self.handler.poll(cx) { Poll::Pending => {} Poll::Ready(Ok(handler_wrapper::Event::OutboundSubstreamRequest(user_data))) => { - self.muxing.open_substream(user_data); + let id = self.muxing.open_outbound(); + self.open_info.insert(id, user_data); continue; } Poll::Ready(Ok(handler_wrapper::Event::Custom(event))) => { @@ -148,20 +155,19 @@ where // of new substreams. match self.muxing.poll(cx) { Poll::Pending => {} - Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => { + Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => { self.handler .inject_substream(substream, SubstreamEndpoint::Listener); continue; } - Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { - user_data, - substream, - })) => { + Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(substream, id))) => { + let user_data = self.open_info.remove(&id).expect("unknown substream id"); let endpoint = SubstreamEndpoint::Dialer(user_data); + self.handler.inject_substream(substream, endpoint); continue; } - Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => { + Poll::Ready(Ok(StreamMuxerEvent::AddressChange(address))) => { self.handler.inject_address_change(&address); return Poll::Ready(Ok(Event::AddressChange(address))); } @@ -173,6 +179,13 @@ where } } +/// Endpoint for a received substream. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum SubstreamEndpoint { + Dialer(TDialInfo), + Listener, +} + /// Borrowed information about an incoming connection currently being negotiated. #[derive(Debug, Copy, Clone)] pub struct IncomingInfo<'a> { diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index 05e389d6134..c11b0b40f76 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::connection::{Substream, SubstreamEndpoint}; +use crate::connection::SubstreamEndpoint; use crate::handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, }; @@ -28,8 +28,8 @@ use futures::prelude::*; use futures::stream::FuturesUnordered; use futures_timer::Delay; use instant::Instant; +use libp2p_core::muxing::SubstreamBox; use libp2p_core::{ - muxing::StreamMuxerBox, upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply, UpgradeError}, Multiaddr, }; @@ -52,20 +52,14 @@ where negotiating_in: FuturesUnordered< SubstreamUpgrade< TConnectionHandler::InboundOpenInfo, - InboundUpgradeApply< - Substream, - SendWrapper, - >, + InboundUpgradeApply>, >, >, /// Futures that upgrade outgoing substreams. negotiating_out: FuturesUnordered< SubstreamUpgrade< TConnectionHandler::OutboundOpenInfo, - OutboundUpgradeApply< - Substream, - SendWrapper, - >, + OutboundUpgradeApply>, >, >, /// For each outbound substream request, how to upgrade it. The first element of the tuple @@ -248,7 +242,7 @@ where { pub fn inject_substream( &mut self, - substream: Substream, + substream: SubstreamBox, // The first element of the tuple is the unique upgrade identifier // (see `unique_dial_upgrade_id`). endpoint: SubstreamEndpoint>, diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index a8f28beeadc..740bbbf9632 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -38,7 +38,8 @@ use futures::{ stream::FuturesUnordered, }; use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint}; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox}; +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::StreamMuxer; use std::{ collections::{hash_map, HashMap}, convert::TryFrom as _, @@ -604,7 +605,7 @@ where match event { task::PendingConnectionEvent::ConnectionEstablished { id, - output: (obtained_peer_id, muxer), + output: (obtained_peer_id, mut muxer), outgoing, } => { let PendingConnectionInfo { diff --git a/swarm/src/connection/substream.rs b/swarm/src/connection/substream.rs deleted file mode 100644 index b7e3d3bad87..00000000000 --- a/swarm/src/connection/substream.rs +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use futures::prelude::*; -use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::{substream_from_ref, StreamMuxer, StreamMuxerEvent, SubstreamRef}; -use smallvec::SmallVec; -use std::sync::Arc; -use std::{fmt, io::Error as IoError, pin::Pin, task::Context, task::Poll}; - -/// Endpoint for a received substream. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum SubstreamEndpoint { - Dialer(TDialInfo), - Listener, -} - -/// Implementation of `Stream` that handles substream multiplexing. -/// -/// The stream will receive substreams and can be used to open new outgoing substreams. Destroying -/// the `Muxing` will **not** close the existing substreams. -/// -/// The stream will close once both the inbound and outbound channels are closed, and no more -/// outbound substream attempt is pending. -pub struct Muxing -where - TMuxer: StreamMuxer, -{ - /// The muxer used to manage substreams. - inner: Arc, - /// List of substreams we are currently opening. - outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>, -} - -/// Future that signals the remote that we have closed the connection. -pub struct Close { - /// Muxer to close. - muxer: Arc, -} - -/// A successfully opened substream. -pub type Substream = SubstreamRef>; - -/// Event that can happen on the `Muxing`. -pub enum SubstreamEvent -where - TMuxer: StreamMuxer, -{ - /// A new inbound substream arrived. - InboundSubstream { - /// The newly-opened substream. Will return EOF of an error if the `Muxing` is - /// destroyed or `close_graceful` is called. - substream: Substream, - }, - - /// An outbound substream has successfully been opened. - OutboundSubstream { - /// User data that has been passed to the `open_substream` method. - user_data: TUserData, - /// The newly-opened substream. Will return EOF of an error if the `Muxing` is - /// destroyed or `close_graceful` is called. - substream: Substream, - }, - - /// Address to the remote has changed. The previous one is now obsolete. - /// - /// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes - /// > can change their IP address while retaining the same QUIC connection. - AddressChange(Multiaddr), -} - -/// Identifier for a substream being opened. -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct OutboundSubstreamId(usize); - -impl Muxing -where - TMuxer: StreamMuxer, -{ - /// Creates a new node events stream. - pub fn new(muxer: TMuxer) -> Self { - Muxing { - inner: Arc::new(muxer), - outbound_substreams: SmallVec::new(), - } - } - - /// Starts the process of opening a new outbound substream. - /// - /// After calling this method, polling the stream should eventually produce either an - /// `OutboundSubstream` event or an `OutboundClosed` event containing the user data that has - /// been passed to this method. - pub fn open_substream(&mut self, user_data: TUserData) { - let raw = self.inner.open_outbound(); - self.outbound_substreams.push((user_data, raw)); - } - - /// Destroys the node stream and returns all the pending outbound substreams, plus an object - /// that signals the remote that we shut down the connection. - #[must_use] - pub fn close(mut self) -> (Close, Vec) { - let substreams = self.cancel_outgoing(); - let close = Close { - muxer: self.inner.clone(), - }; - (close, substreams) - } - - /// Destroys all outbound streams and returns the corresponding user data. - pub fn cancel_outgoing(&mut self) -> Vec { - let mut out = Vec::with_capacity(self.outbound_substreams.len()); - for (user_data, outbound) in self.outbound_substreams.drain(..) { - out.push(user_data); - self.inner.destroy_outbound(outbound); - } - out - } - - /// Provides an API similar to `Future`. - pub fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll, IoError>> { - // Polling inbound substream. - match self.inner.poll_event(cx) { - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => { - let substream = substream_from_ref(self.inner.clone(), substream); - return Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })); - } - Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => { - return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr))) - } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => {} - } - - // Polling outbound substreams. - // We remove each element from `outbound_substreams` one by one and add them back. - for n in (0..self.outbound_substreams.len()).rev() { - let (user_data, mut outbound) = self.outbound_substreams.swap_remove(n); - match self.inner.poll_outbound(cx, &mut outbound) { - Poll::Ready(Ok(substream)) => { - let substream = substream_from_ref(self.inner.clone(), substream); - self.inner.destroy_outbound(outbound); - return Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { - user_data, - substream, - })); - } - Poll::Pending => { - self.outbound_substreams.push((user_data, outbound)); - } - Poll::Ready(Err(err)) => { - self.inner.destroy_outbound(outbound); - return Poll::Ready(Err(err)); - } - } - } - - // Nothing happened. Register our task to be notified and return. - Poll::Pending - } -} - -impl fmt::Debug for Muxing -where - TMuxer: StreamMuxer, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Muxing") - .field("outbound_substreams", &self.outbound_substreams.len()) - .finish() - } -} - -impl Drop for Muxing -where - TMuxer: StreamMuxer, -{ - fn drop(&mut self) { - // The substreams that were produced will continue to work, as the muxer is held in an Arc. - // However we will no longer process any further inbound or outbound substream, and we - // therefore close everything. - for (_, outbound) in self.outbound_substreams.drain(..) { - self.inner.destroy_outbound(outbound); - } - } -} - -impl Future for Close -where - TMuxer: StreamMuxer, -{ - type Output = Result<(), IoError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.muxer.poll_close(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(err)) => Poll::Ready(Err(err)), - } - } -} - -impl fmt::Debug for Close -where - TMuxer: StreamMuxer, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Close").finish() - } -} - -impl fmt::Debug for SubstreamEvent -where - TMuxer: StreamMuxer, - TMuxer::Substream: fmt::Debug, - TUserData: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SubstreamEvent::InboundSubstream { substream } => f - .debug_struct("SubstreamEvent::OutboundClosed") - .field("substream", substream) - .finish(), - SubstreamEvent::OutboundSubstream { - user_data, - substream, - } => f - .debug_struct("SubstreamEvent::OutboundSubstream") - .field("user_data", user_data) - .field("substream", substream) - .finish(), - SubstreamEvent::AddressChange(address) => f - .debug_struct("SubstreamEvent::AddressChange") - .field("address", address) - .finish(), - } - } -} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 90b158dbf1e..c5ef1b0aa88 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -79,11 +79,12 @@ pub use handler::{ pub use registry::{AddAddressResult, AddressRecord, AddressScore}; use connection::pool::{Pool, PoolConfig, PoolEvent}; -use connection::{EstablishedConnection, IncomingInfo, ListenersEvent, ListenersStream, Substream}; +use connection::{EstablishedConnection, IncomingInfo, ListenersEvent, ListenersStream}; use dial_opts::{DialOpts, PeerCondition}; use either::Either; use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream}; use libp2p_core::connection::{ConnectionId, PendingPoint}; +use libp2p_core::muxing::SubstreamBox; use libp2p_core::{ connection::{ConnectedPoint, ListenerId}, multiaddr::Protocol, @@ -110,7 +111,7 @@ use upgrade::UpgradeInfoSend as _; /// /// Implements the [`AsyncRead`](futures::io::AsyncRead) and /// [`AsyncWrite`](futures::io::AsyncWrite) traits. -pub type NegotiatedSubstream = Negotiated>; +pub type NegotiatedSubstream = Negotiated; /// Event generated by the [`NetworkBehaviour`] that the swarm will report back. type TBehaviourOutEvent = ::OutEvent; From 4b6144ce7a3a6394f0d478dd6caa356ae062f2d7 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 15 Jun 2022 19:52:55 +0200 Subject: [PATCH 4/7] fixup! Re-design the `StreamMuxer` trait Reduce diff. --- core/src/muxing/boxed.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index a95331e62b1..101790903fa 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -25,10 +25,10 @@ struct Wrap { inner: T, } -impl StreamMuxer for Wrap +impl StreamMuxer for Wrap where - T: StreamMuxer, - S: AsyncRead + AsyncWrite + Send + Unpin + 'static, + T: StreamMuxer, + T::Substream: AsyncRead + AsyncWrite + Send + Unpin + 'static, { type Substream = SubstreamBox; From 32bb1901af8bca7599d9e24a3143b6ac6aece750 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 15 Jun 2022 19:56:24 +0200 Subject: [PATCH 5/7] fixup! Re-design the `StreamMuxer` trait Reduce diff. --- core/src/muxing/boxed.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index 101790903fa..6472e87edd9 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -52,12 +52,15 @@ where impl StreamMuxerBox { /// Construct a new [`StreamMuxerBox`]. - pub fn new(muxer: impl StreamMuxer + Send + 'static) -> Self + pub fn new(muxer: T) -> StreamMuxerBox where - S: AsyncRead + AsyncWrite + Send + Unpin + 'static, + T: StreamMuxer + Send + 'static, + T::Substream: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - Self { - inner: Box::new(Wrap { inner: muxer }), + let wrap = Wrap { inner: muxer }; + + StreamMuxerBox { + inner: Box::new(wrap), } } } From 377187c0d1e0d93d93a4cbcbaa0f9044b9951b7e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 15 Jun 2022 19:57:30 +0200 Subject: [PATCH 6/7] fixup! Re-design the `StreamMuxer` trait Fix compile errors. --- core/src/muxing/boxed.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index 6472e87edd9..cf36237cf3e 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -54,7 +54,7 @@ impl StreamMuxerBox { /// Construct a new [`StreamMuxerBox`]. pub fn new(muxer: T) -> StreamMuxerBox where - T: StreamMuxer + Send + 'static, + T: StreamMuxer + Send + 'static, T::Substream: AsyncRead + AsyncWrite + Send + Unpin + 'static, { let wrap = Wrap { inner: muxer }; From 5d628bcbd09cdf6ba7225939078331c456adb2e2 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 15 Jun 2022 20:22:30 +0200 Subject: [PATCH 7/7] fixup! Re-design the `StreamMuxer` trait Fix docs error. --- swarm/src/connection/handler_wrapper.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index c11b0b40f76..e09efb83082 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -28,8 +28,8 @@ use futures::prelude::*; use futures::stream::FuturesUnordered; use futures_timer::Delay; use instant::Instant; -use libp2p_core::muxing::SubstreamBox; use libp2p_core::{ + muxing::SubstreamBox, upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply, UpgradeError}, Multiaddr, }; @@ -78,8 +78,8 @@ where /// Note: This only enforces a limit on the number of concurrently /// negotiating inbound streams. The total number of inbound streams on a /// connection is the sum of negotiating and negotiated streams. A limit on - /// the total number of streams can be enforced at the [`StreamMuxerBox`] - /// level. + /// the total number of streams can be enforced at the + /// [`StreamMuxerBox`](libp2p_core::muxing::StreamMuxerBox) level. max_negotiating_inbound_streams: usize, }