From cc75a1a27eee39cd6e473472b6d4815f326e687e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 19 May 2022 17:40:29 +0200 Subject: [PATCH] Re-design the `StreamMuxer` trait The current `StreamMuxer` API is based around a poll-based approach where each substream needs to be passed by value to read/write/close it. The muxer itself however is not available in the end-user abstractions like `ConnectionHandler`. To circumvent this, the `StreamMuxerBox` type exists which allows a `StreamMuxer` to be cloned. Together with `SubstreamRef`, this allows each substream to own a reference to the muxer it was created with and pass itself to the muxer within implementations of `AsyncRead` and `AsyncWrite`. These implementations are convenient as they allow an end-user to simply read and write to the substream without holding a reference to anything else. We can achieve the same goal by changing the `StreamMuxer` abstraction to always return a `AsyncReadWriteBox`. Depending on the implementation of the muxer, this may require more code within its implementation of the `StreamMuxer` trait. yamux already provides a `Stream` type that implements `AsyncRead` and `AsyncWrite` and we can thus directly construct a `AsyncReadWriteBox`. For mplex, we wrap the actual muxer in an `Arc` and store it within the `Substream` struct. This allowsus to call the various `poll_` functions from within the `AsyncRead` and `AsyncWrite` implementations. --- core/CHANGELOG.md | 2 + core/src/either.rs | 150 +----- core/src/muxing.rs | 679 +++--------------------- core/src/muxing/async_read_write_box.rs | 105 ++++ core/src/muxing/singleton.rs | 117 +--- core/src/transport/upgrade.rs | 6 +- core/tests/util.rs | 4 +- muxers/mplex/CHANGELOG.md | 3 + muxers/mplex/benches/split_send_size.rs | 64 +-- muxers/mplex/src/lib.rs | 181 ++++--- muxers/mplex/tests/async_write.rs | 15 +- muxers/mplex/tests/two_peers.rs | 58 +- muxers/yamux/CHANGELOG.md | 3 + muxers/yamux/src/lib.rs | 125 ++--- protocols/rendezvous/tests/harness.rs | 1 - swarm/src/connection.rs | 40 +- swarm/src/connection/handler_wrapper.rs | 10 +- swarm/src/connection/pool.rs | 4 +- swarm/src/connection/substream.rs | 231 -------- swarm/src/lib.rs | 5 +- 20 files changed, 463 insertions(+), 1340 deletions(-) create mode 100644 core/src/muxing/async_read_write_box.rs diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 3a2cffea52a3..93d055f91ffb 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,8 +1,10 @@ # 0.33.0 [unreleased] - Have methods on `Transport` take `&mut self` instead of `self`. See [PR 2529]. +- Re-design `StreamMuxer` trait to take `&mut self` and remove all associated types. See [PR 2648]. [PR 2529]: https://github.com/libp2p/rust-libp2p/pull/2529 +[PR 2648]: https://github.com/libp2p/rust-libp2p/pull/2648 # 0.32.1 diff --git a/core/src/either.rs b/core/src/either.rs index 2dd01813f7e8..09c4f88d92f3 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::muxing::OutboundSubstreamId; use crate::{ muxing::{StreamMuxer, StreamMuxerEvent}, transport::{ListenerEvent, Transport, TransportError}, @@ -201,149 +202,21 @@ where A: StreamMuxer, B: StreamMuxer, { - type Substream = EitherOutput; - type OutboundSubstream = EitherOutbound; - - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll>> { + fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { match self { - EitherOutput::First(inner) => inner.poll_event(cx).map(|result| { - result.map(|event| match event { - StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), - StreamMuxerEvent::InboundSubstream(substream) => { - StreamMuxerEvent::InboundSubstream(EitherOutput::First(substream)) - } - }) - }), - EitherOutput::Second(inner) => inner.poll_event(cx).map(|result| { - result.map(|event| match event { - StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), - StreamMuxerEvent::InboundSubstream(substream) => { - StreamMuxerEvent::InboundSubstream(EitherOutput::Second(substream)) - } - }) - }), - } - } - - fn open_outbound(&self) -> Self::OutboundSubstream { - match self { - EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()), - EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()), - } - } - - fn poll_outbound( - &self, - cx: &mut Context<'_>, - substream: &mut Self::OutboundSubstream, - ) -> Poll> { - match (self, substream) { - (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner - .poll_outbound(cx, substream) - .map(|p| p.map(EitherOutput::First)), - (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner - .poll_outbound(cx, substream) - .map(|p| p.map(EitherOutput::Second)), - _ => panic!("Wrong API usage"), - } - } - - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - match self { - EitherOutput::First(inner) => match substream { - EitherOutbound::A(substream) => inner.destroy_outbound(substream), - _ => panic!("Wrong API usage"), - }, - EitherOutput::Second(inner) => match substream { - EitherOutbound::B(substream) => inner.destroy_outbound(substream), - _ => panic!("Wrong API usage"), - }, - } - } - - fn read_substream( - &self, - cx: &mut Context<'_>, - sub: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll> { - match (self, sub) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.read_substream(cx, sub, buf) - } - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.read_substream(cx, sub, buf) - } - _ => panic!("Wrong API usage"), - } - } - - fn write_substream( - &self, - cx: &mut Context<'_>, - sub: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - match (self, sub) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.write_substream(cx, sub, buf) - } - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.write_substream(cx, sub, buf) - } - _ => panic!("Wrong API usage"), - } - } - - fn flush_substream( - &self, - cx: &mut Context<'_>, - sub: &mut Self::Substream, - ) -> Poll> { - match (self, sub) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.flush_substream(cx, sub) - } - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.flush_substream(cx, sub) - } - _ => panic!("Wrong API usage"), + EitherOutput::First(inner) => inner.poll(cx), + EitherOutput::Second(inner) => inner.poll(cx), } } - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - sub: &mut Self::Substream, - ) -> Poll> { - match (self, sub) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.shutdown_substream(cx, sub) - } - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.shutdown_substream(cx, sub) - } - _ => panic!("Wrong API usage"), - } - } - - fn destroy_substream(&self, substream: Self::Substream) { + fn open_outbound(&mut self) -> OutboundSubstreamId { match self { - EitherOutput::First(inner) => match substream { - EitherOutput::First(substream) => inner.destroy_substream(substream), - _ => panic!("Wrong API usage"), - }, - EitherOutput::Second(inner) => match substream { - EitherOutput::Second(substream) => inner.destroy_substream(substream), - _ => panic!("Wrong API usage"), - }, + EitherOutput::First(inner) => inner.open_outbound(), + EitherOutput::Second(inner) => inner.open_outbound(), } } - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { match self { EitherOutput::First(inner) => inner.poll_close(cx), EitherOutput::Second(inner) => inner.poll_close(cx), @@ -351,13 +224,6 @@ where } } -#[derive(Debug, Copy, Clone)] -#[must_use = "futures do nothing unless polled"] -pub enum EitherOutbound { - A(A::OutboundSubstream), - B(B::OutboundSubstream), -} - /// Implements `Stream` and dispatches all method calls to either `First` or `Second`. #[pin_project(project = EitherListenStreamProj)] #[derive(Debug, Copy, Clone)] diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 0072a5c7ea98..a6564cb82a6d 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -21,8 +21,7 @@ //! Muxing is the process of splitting a connection into multiple substreams. //! //! The main item of this module is the `StreamMuxer` trait. An implementation of `StreamMuxer` -//! has ownership of a connection, lets you open and close substreams, and read/write data -//! on open substreams. +//! has ownership of a connection, lets you open and close substreams. //! //! > **Note**: You normally don't need to use the methods of the `StreamMuxer` directly, as this //! > is managed by the library's internals. @@ -51,175 +50,60 @@ //! The upgrade process will take ownership of the connection, which makes it possible for the //! implementation of `StreamMuxer` to control everything that happens on the wire. -use fnv::FnvHashMap; -use futures::{future, prelude::*, task::Context, task::Poll}; +use futures::{task::Context, task::Poll}; use multiaddr::Multiaddr; -use parking_lot::Mutex; -use std::{ - fmt, io, - ops::Deref, - pin::Pin, - sync::atomic::{AtomicUsize, Ordering}, -}; +use std::collections::VecDeque; +use std::io; +pub use self::async_read_write_box::AsyncReadWriteBox; pub use self::singleton::SingletonMuxer; +mod async_read_write_box; mod singleton; -/// Implemented on objects that can open and manage substreams. -/// -/// The state of a muxer, as exposed by this API, is the following: -/// -/// - A connection to the remote. The `poll_event`, `flush_all` and `close` methods operate -/// on this. -/// - A list of substreams that are open. The `poll_outbound`, `read_substream`, `write_substream`, -/// `flush_substream`, `shutdown_substream` and `destroy_substream` methods allow controlling -/// these entries. -/// - A list of outbound substreams being opened. The `open_outbound`, `poll_outbound` and -/// `destroy_outbound` methods allow controlling these entries. +/// Provides multiplexing for a connection by allowing users to open substreams. /// +/// A [`StreamMuxer`] is a stateful object and needs to be continuously polled to make progress. +/// Implementations need to construct [`AsyncReadWriteBox`]es out of their concrete substreams. +/// Using a concrete type rather than an associated type simplifies downstream usage of a [`StreamMuxer`]. pub trait StreamMuxer { - /// Type of the object that represents the raw substream where data can be read and written. - type Substream; - - /// Future that will be resolved when the outgoing substream is open. - type OutboundSubstream; - - /// Polls for a connection-wide event. - /// - /// This function behaves the same as a `Stream`. - /// - /// If `Pending` is returned, then the current task will be notified once the muxer - /// is ready to be polled, similar to the API of `Stream::poll()`. - /// Only the latest task that was used to call this method may be notified. + /// Polls the [`StreamMuxer`] to make progress. /// /// It is permissible and common to use this method to perform background /// work, such as processing incoming packets and polling timers. /// /// An error can be generated if the connection has been closed. - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll>>; - - /// Opens a new outgoing substream, and produces the equivalent to a future that will be - /// resolved when it becomes available. - /// - /// The API of `OutboundSubstream` is totally opaque, and the object can only be interfaced - /// through the methods on the `StreamMuxer` trait. - fn open_outbound(&self) -> Self::OutboundSubstream; + fn poll(&mut self, cx: &mut Context<'_>) -> Poll>; - /// Polls the outbound substream. + /// Instruct this [`StreamMuxer`] to open a new outbound substream. /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be polled, similar to the API of `Future::poll()`. - /// However, for each individual outbound substream, only the latest task that was used to - /// call this method may be notified. - /// - /// May panic or produce an undefined result if an earlier polling of the same substream - /// returned `Ready` or `Err`. - fn poll_outbound( - &self, - cx: &mut Context<'_>, - s: &mut Self::OutboundSubstream, - ) -> Poll>; - - /// Destroys an outbound substream future. Use this after the outbound substream has finished, - /// or if you want to interrupt it. - fn destroy_outbound(&self, s: Self::OutboundSubstream); - - /// Reads data from a substream. The behaviour is the same as `futures::AsyncRead::poll_read`. - /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be read. However, for each individual substream, only the latest task that - /// was used to call this method may be notified. - /// - /// If `Async::Ready(0)` is returned, the substream has been closed by the remote and should - /// no longer be read afterwards. - /// - /// An error can be generated if the connection has been closed, or if a protocol misbehaviour - /// happened. - fn read_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll>; + /// This function returns instantly with a new ID but the substream has not been opened at that + /// point! When the substream is ready to be used, you will receive a + /// [`StreamMuxerEvent::OutboundSubstream`] event with the same ID as returned here. + fn open_outbound(&mut self) -> OutboundSubstreamId; - /// Write data to a substream. The behaviour is the same as `futures::AsyncWrite::poll_write`. - /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be read. For each individual substream, only the latest task that was used to - /// call this method may be notified. - /// - /// Calling `write_substream` does not guarantee that data will arrive to the remote. To - /// ensure that, you should call `flush_substream`. - /// - /// It is incorrect to call this method on a substream if you called `shutdown_substream` on - /// this substream earlier. - fn write_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &[u8], - ) -> Poll>; - - /// Flushes a substream. The behaviour is the same as `futures::AsyncWrite::poll_flush`. - /// - /// After this method has been called, data written earlier on the substream is guaranteed to - /// be received by the remote. - /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be read. For each individual substream, only the latest task that was used to - /// call this method may be notified. - /// - /// > **Note**: This method may be implemented as a call to `flush_all`. - fn flush_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll>; - - /// Attempts to shut down the writing side of a substream. The behaviour is similar to - /// `AsyncWrite::poll_close`. - /// - /// Contrary to `AsyncWrite::poll_close`, shutting down a substream does not imply - /// `flush_substream`. If you want to make sure that the remote is immediately informed about - /// the shutdown, use `flush_substream` or `flush_all`. - /// - /// After this method has been called, you should no longer attempt to write to this substream. - /// - /// An error can be generated if the connection has been closed, or if a protocol misbehaviour - /// happened. - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll>; - - /// Destroys a substream. - fn destroy_substream(&self, s: Self::Substream); - - /// Closes this `StreamMuxer`. + /// Closes this [`StreamMuxer`]. /// /// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless. All /// subsequent reads must return either `EOF` or an error. All subsequent writes, shutdowns, /// or polls must generate an error or be ignored. /// - /// Calling this method implies `flush_all`. - /// /// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so /// > that the remote is properly informed of the shutdown. However, apart from /// > properly informing the remote, there is no difference between this and /// > immediately dropping the muxer. - fn poll_close(&self, cx: &mut Context<'_>) -> Poll>; + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll>; } /// Event about a connection, reported by an implementation of [`StreamMuxer`]. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum StreamMuxerEvent { +#[derive(Debug)] +pub enum StreamMuxerEvent { /// Remote has opened a new substream. Contains the substream in question. - InboundSubstream(T), + InboundSubstream(AsyncReadWriteBox), + + /// We have opened a new substream. Contains the substream and the ID returned + /// from [`StreamMuxer::open_outbound`]. + OutboundSubstream(AsyncReadWriteBox, OutboundSubstreamId), /// Address to the remote has changed. The previous one is now obsolete. /// @@ -228,498 +112,81 @@ pub enum StreamMuxerEvent { AddressChange(Multiaddr), } -impl StreamMuxerEvent { +impl StreamMuxerEvent { /// If `self` is a [`StreamMuxerEvent::InboundSubstream`], returns the content. Otherwise /// returns `None`. - pub fn into_inbound_substream(self) -> Option { + pub fn into_inbound_substream(self) -> Option { if let StreamMuxerEvent::InboundSubstream(s) = self { Some(s) } else { None } } -} - -/// Polls for an event from the muxer and, if an inbound substream, wraps this substream in an -/// object that implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`. -pub fn event_from_ref_and_wrap

( - muxer: P, -) -> impl Future>>> -where - P: Deref + Clone, - P::Target: StreamMuxer, -{ - let muxer2 = muxer.clone(); - future::poll_fn(move |cx| muxer.poll_event(cx)).map_ok(|event| match event { - StreamMuxerEvent::InboundSubstream(substream) => { - StreamMuxerEvent::InboundSubstream(substream_from_ref(muxer2, substream)) - } - StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), - }) -} - -/// Same as `outbound_from_ref`, but wraps the output in an object that -/// implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`. -pub fn outbound_from_ref_and_wrap

(muxer: P) -> OutboundSubstreamRefWrapFuture

-where - P: Deref + Clone, - P::Target: StreamMuxer, -{ - let inner = outbound_from_ref(muxer); - OutboundSubstreamRefWrapFuture { inner } -} - -/// Future returned by `outbound_from_ref_and_wrap`. -pub struct OutboundSubstreamRefWrapFuture

-where - P: Deref + Clone, - P::Target: StreamMuxer, -{ - inner: OutboundSubstreamRefFuture

, -} - -impl

Future for OutboundSubstreamRefWrapFuture

-where - P: Deref + Clone, - P::Target: StreamMuxer, -{ - type Output = io::Result>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Future::poll(Pin::new(&mut self.inner), cx) { - Poll::Ready(Ok(substream)) => { - let out = substream_from_ref(self.inner.muxer.clone(), substream); - Poll::Ready(Ok(out)) - } - Poll::Pending => Poll::Pending, - Poll::Ready(Err(err)) => Poll::Ready(Err(err)), - } - } -} - -/// Builds a new future for an outbound substream, where the muxer is a reference. -pub fn outbound_from_ref

(muxer: P) -> OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ - let outbound = muxer.open_outbound(); - OutboundSubstreamRefFuture { - muxer, - outbound: Some(outbound), - } -} - -/// Future returned by `outbound_from_ref`. -pub struct OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ - muxer: P, - outbound: Option<::OutboundSubstream>, -} -impl

Unpin for OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ -} - -impl

Future for OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ - type Output = io::Result<::Substream>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; - this.muxer - .poll_outbound(cx, this.outbound.as_mut().expect("outbound was empty")) - } -} - -impl

Drop for OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ - fn drop(&mut self) { - self.muxer - .destroy_outbound(self.outbound.take().expect("outbound was empty")) - } -} - -/// Builds an implementation of `Read`/`Write`/`AsyncRead`/`AsyncWrite` from an `Arc` to the -/// muxer and a substream. -pub fn substream_from_ref

( - muxer: P, - substream: ::Substream, -) -> SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - SubstreamRef { - muxer, - substream: Some(substream), - shutdown_state: ShutdownState::Shutdown, - } -} - -/// Stream returned by `substream_from_ref`. -pub struct SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - muxer: P, - substream: Option<::Substream>, - shutdown_state: ShutdownState, -} - -enum ShutdownState { - Shutdown, - Flush, - Done, -} - -impl

fmt::Debug for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, - ::Substream: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - write!(f, "Substream({:?})", self.substream) - } -} - -impl

Unpin for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ -} - -impl

AsyncRead for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; - - let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.read_substream(cx, s, buf) - } -} - -impl

AsyncWrite for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; - - let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.write_substream(cx, s, buf) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; - - let s = this.substream.as_mut().expect("substream was empty"); - loop { - match this.shutdown_state { - ShutdownState::Shutdown => match this.muxer.shutdown_substream(cx, s) { - Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Flush, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => return Poll::Pending, - }, - ShutdownState::Flush => match this.muxer.flush_substream(cx, s) { - Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Done, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => return Poll::Pending, - }, - ShutdownState::Done => { - return Poll::Ready(Ok(())); - } - } + /// If `self` is a [`StreamMuxerEvent::OutboundSubstream`], returns the content. Otherwise + /// returns `None`. + pub fn into_outbound_substream(self) -> Option<(AsyncReadWriteBox, OutboundSubstreamId)> { + if let StreamMuxerEvent::OutboundSubstream(s, id) = self { + Some((s, id)) + } else { + None } } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; - - let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.flush_substream(cx, s) - } -} - -impl

Drop for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - fn drop(&mut self) { - self.muxer - .destroy_substream(self.substream.take().expect("substream was empty")) - } } /// Abstract `StreamMuxer`. -pub struct StreamMuxerBox { - inner: Box + Send + Sync>, -} +pub type StreamMuxerBox = Box; -impl StreamMuxerBox { - /// Turns a stream muxer into a `StreamMuxerBox`. - pub fn new(muxer: T) -> StreamMuxerBox - where - T: StreamMuxer + Send + Sync + 'static, - T::OutboundSubstream: Send, - T::Substream: Send, - { - let wrap = Wrap { - inner: muxer, - substreams: Mutex::new(Default::default()), - next_substream: AtomicUsize::new(0), - outbound: Mutex::new(Default::default()), - next_outbound: AtomicUsize::new(0), - }; +#[derive(Default, Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Hash)] +pub struct OutboundSubstreamId(u64); - StreamMuxerBox { - inner: Box::new(wrap), - } - } -} +impl OutboundSubstreamId { + pub(crate) const ZERO: OutboundSubstreamId = OutboundSubstreamId(0); -impl StreamMuxer for StreamMuxerBox { - type Substream = usize; // TODO: use a newtype - type OutboundSubstream = usize; // TODO: use a newtype + fn fetch_and_increment(&mut self) -> Self { + let next_id = Self(self.0); - #[inline] - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll>> { - self.inner.poll_event(cx) - } + self.0 += 1; - #[inline] - fn open_outbound(&self) -> Self::OutboundSubstream { - self.inner.open_outbound() - } - - #[inline] - fn poll_outbound( - &self, - cx: &mut Context<'_>, - s: &mut Self::OutboundSubstream, - ) -> Poll> { - self.inner.poll_outbound(cx, s) - } - - #[inline] - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - self.inner.destroy_outbound(substream) - } - - #[inline] - fn read_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll> { - self.inner.read_substream(cx, s, buf) - } - - #[inline] - fn write_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - self.inner.write_substream(cx, s, buf) - } - - #[inline] - fn flush_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - self.inner.flush_substream(cx, s) - } - - #[inline] - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - self.inner.shutdown_substream(cx, s) - } - - #[inline] - fn destroy_substream(&self, s: Self::Substream) { - self.inner.destroy_substream(s) - } - - #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx) + next_id } } -struct Wrap -where - T: StreamMuxer, -{ - inner: T, - substreams: Mutex>, - next_substream: AtomicUsize, - outbound: Mutex>, - next_outbound: AtomicUsize, +/// Utility state machine for muxers to simplify the generation of new [`OutboundSubstreamId`]s +/// and the state handling around when new outbound substreams should be opened. +#[derive(Default)] +pub struct OutboundSubstreams { + current_id: OutboundSubstreamId, + substreams_to_open: VecDeque, } -impl StreamMuxer for Wrap -where - T: StreamMuxer, -{ - type Substream = usize; // TODO: use a newtype - type OutboundSubstream = usize; // TODO: use a newtype - - #[inline] - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll>> { - let substream = match self.inner.poll_event(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) => { - return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) - } - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - }; - - let id = self.next_substream.fetch_add(1, Ordering::Relaxed); - self.substreams.lock().insert(id, substream); - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(id))) - } - - #[inline] - fn open_outbound(&self) -> Self::OutboundSubstream { - let outbound = self.inner.open_outbound(); - let id = self.next_outbound.fetch_add(1, Ordering::Relaxed); - self.outbound.lock().insert(id, outbound); - id - } - - #[inline] - fn poll_outbound( - &self, - cx: &mut Context<'_>, - substream: &mut Self::OutboundSubstream, - ) -> Poll> { - let mut list = self.outbound.lock(); - let substream = match self - .inner - .poll_outbound(cx, list.get_mut(substream).unwrap()) - { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(s)) => s, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - }; - let id = self.next_substream.fetch_add(1, Ordering::Relaxed); - self.substreams.lock().insert(id, substream); - Poll::Ready(Ok(id)) - } - - #[inline] - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - let mut list = self.outbound.lock(); - self.inner - .destroy_outbound(list.remove(&substream).unwrap()) - } +impl OutboundSubstreams { + /// Instruct this state machine to open a new outbound substream. + pub fn open_new(&mut self) -> OutboundSubstreamId { + let next_id = self.current_id.fetch_and_increment(); + self.substreams_to_open.push_back(next_id); - #[inline] - fn read_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner.read_substream(cx, list.get_mut(s).unwrap(), buf) + next_id } - #[inline] - fn write_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner - .write_substream(cx, list.get_mut(s).unwrap(), buf) - } - - #[inline] - fn flush_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner.flush_substream(cx, list.get_mut(s).unwrap()) - } - - #[inline] - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner.shutdown_substream(cx, list.get_mut(s).unwrap()) - } + /// Allow this state machine to make progress. + /// + /// In case a new substream was requested, we invoke the provided poll function and return the + /// created substream along with its ID. + pub fn poll( + &mut self, + poll_open: impl FnOnce() -> Poll>, + ) -> Poll> { + if self.substreams_to_open.is_empty() { + return Poll::Pending; + } - #[inline] - fn destroy_substream(&self, substream: Self::Substream) { - let mut list = self.substreams.lock(); - self.inner - .destroy_substream(list.remove(&substream).unwrap()) - } + let stream = futures::ready!(poll_open())?; + let id = self + .substreams_to_open + .pop_front() + .expect("we checked that we are not empty"); - #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx) + Poll::Ready(Ok((stream, id))) } } diff --git a/core/src/muxing/async_read_write_box.rs b/core/src/muxing/async_read_write_box.rs new file mode 100644 index 000000000000..928393329cc7 --- /dev/null +++ b/core/src/muxing/async_read_write_box.rs @@ -0,0 +1,105 @@ +// Copyright 2022 Thomas Eizinger. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{AsyncRead, AsyncWrite}; +use std::fmt; +use std::io::{IoSlice, IoSliceMut}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Abstract type for asynchronous reading and writing. +/// +/// An [`AsyncReadWriteBox`] erases the concrete type it is given and only retains its `AsyncRead` +/// and `AsyncWrite` capabilities. +pub struct AsyncReadWriteBox(Box); + +impl AsyncReadWriteBox { + /// Construct a new [`AsyncReadWriteBox`] from something that implements [`AsyncRead`] and [`AsyncWrite`]. + pub fn new(stream: S) -> Self { + Self(Box::new(stream)) + } +} + +impl fmt::Debug for AsyncReadWriteBox { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "AsyncReadWriteBox({})", self.0.type_name()) + } +} + +/// Workaround because Rust does not allow `Box`. +trait AsyncReadWrite: AsyncRead + AsyncWrite + Unpin { + /// Helper function to capture the erased inner type. + /// + /// Used to make the `Debug` implementation of `AsyncReadWriteBox` more useful. + fn type_name(&self) -> &'static str; +} + +impl AsyncReadWrite for S +where + S: AsyncRead + AsyncWrite + Unpin, +{ + fn type_name(&self) -> &'static str { + std::any::type_name::() + } +} + +impl AsyncRead for AsyncReadWriteBox { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_read(cx, buf) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_read_vectored(cx, bufs) + } +} + +impl AsyncWrite for AsyncReadWriteBox { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_write(cx, buf) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_write_vectored(cx, bufs) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_close(cx) + } +} diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index 556cc66a7ffe..91f6bbe7e6b3 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -18,20 +18,15 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::muxing::AsyncReadWriteBox; +use crate::muxing::OutboundSubstreamId; use crate::{ connection::Endpoint, muxing::{StreamMuxer, StreamMuxerEvent}, }; use futures::prelude::*; -use parking_lot::Mutex; -use std::{ - io, - pin::Pin, - sync::atomic::{AtomicBool, Ordering}, - task::Context, - task::Poll, -}; +use std::{io, task::Context, task::Poll}; /// Implementation of `StreamMuxer` that allows only one substream on top of a connection, /// yielding the connection itself. @@ -40,9 +35,7 @@ use std::{ /// Most notably, no protocol is negotiated. pub struct SingletonMuxer { /// The inner connection. - inner: Mutex, - /// If true, a substream has been produced and any further attempt should fail. - substream_extracted: AtomicBool, + inner: Option, /// Our local endpoint. Always the same value as was passed to `new`. endpoint: Endpoint, } @@ -54,101 +47,37 @@ impl SingletonMuxer { /// If `endpoint` is `Listener`, then only one inbound substream will be permitted. pub fn new(inner: TSocket, endpoint: Endpoint) -> Self { SingletonMuxer { - inner: Mutex::new(inner), - substream_extracted: AtomicBool::new(false), + inner: Some(inner), endpoint, } } } -/// Substream of the `SingletonMuxer`. -pub struct Substream {} -/// Outbound substream attempt of the `SingletonMuxer`. -pub struct OutboundSubstream {} - impl StreamMuxer for SingletonMuxer where - TSocket: AsyncRead + AsyncWrite + Unpin, + TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - type Substream = Substream; - type OutboundSubstream = OutboundSubstream; - - fn poll_event( - &self, - _: &mut Context<'_>, - ) -> Poll>> { - match self.endpoint { - Endpoint::Dialer => return Poll::Pending, - Endpoint::Listener => {} - } - - if !self.substream_extracted.swap(true, Ordering::Relaxed) { - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(Substream {}))) - } else { - Poll::Pending - } + fn poll(&mut self, _: &mut Context<'_>) -> Poll> { + let stream = match self.inner.take() { + None => return Poll::Pending, + Some(stream) => AsyncReadWriteBox::new(stream), + }; + + let event = match self.endpoint { + Endpoint::Dialer => { + StreamMuxerEvent::OutboundSubstream(stream, OutboundSubstreamId::ZERO) + } + Endpoint::Listener => StreamMuxerEvent::InboundSubstream(stream), + }; + + Poll::Ready(Ok(event)) } - fn open_outbound(&self) -> Self::OutboundSubstream { - OutboundSubstream {} - } - - fn poll_outbound( - &self, - _: &mut Context<'_>, - _: &mut Self::OutboundSubstream, - ) -> Poll> { - match self.endpoint { - Endpoint::Listener => return Poll::Pending, - Endpoint::Dialer => {} - } - - if !self.substream_extracted.swap(true, Ordering::Relaxed) { - Poll::Ready(Ok(Substream {})) - } else { - Poll::Pending - } + fn open_outbound(&mut self) -> OutboundSubstreamId { + OutboundSubstreamId::ZERO } - fn destroy_outbound(&self, _: Self::OutboundSubstream) {} - - fn read_substream( - &self, - cx: &mut Context<'_>, - _: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll> { - AsyncRead::poll_read(Pin::new(&mut *self.inner.lock()), cx, buf) - } - - fn write_substream( - &self, - cx: &mut Context<'_>, - _: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - AsyncWrite::poll_write(Pin::new(&mut *self.inner.lock()), cx, buf) - } - - fn flush_substream( - &self, - cx: &mut Context<'_>, - _: &mut Self::Substream, - ) -> Poll> { - AsyncWrite::poll_flush(Pin::new(&mut *self.inner.lock()), cx) - } - - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - _: &mut Self::Substream, - ) -> Poll> { - AsyncWrite::poll_close(Pin::new(&mut *self.inner.lock()), cx) - } - - fn destroy_substream(&self, _: Self::Substream) {} - - fn poll_close(&self, _: &mut Context<'_>) -> Poll> { + fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 3930adbfd175..2c98a675e68b 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -300,11 +300,9 @@ impl Multiplexed { T::Listener: Send + 'static, T::ListenerUpgrade: Send + 'static, T::Error: Send + Sync, - M: StreamMuxer + Send + Sync + 'static, - M::Substream: Send + 'static, - M::OutboundSubstream: Send + 'static, + M: StreamMuxer + Send + 'static, { - boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m)))) + boxed(self.map(|(i, m), _| (i, Box::new(m) as StreamMuxerBox))) } /// Adds a timeout to the setup and protocol upgrade process for all diff --git a/core/tests/util.rs b/core/tests/util.rs index 1d68f1fc501a..caeba49c2d4e 100644 --- a/core/tests/util.rs +++ b/core/tests/util.rs @@ -31,8 +31,8 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match std::mem::replace(&mut self.state, CloseMuxerState::Done) { - CloseMuxerState::Close(muxer) => { - if !muxer.close(cx)?.is_ready() { + CloseMuxerState::Close(mut muxer) => { + if !muxer.poll_close(cx)?.is_ready() { self.state = CloseMuxerState::Close(muxer); return Poll::Pending; } diff --git a/muxers/mplex/CHANGELOG.md b/muxers/mplex/CHANGELOG.md index af84e6639e76..f8a8254806e4 100644 --- a/muxers/mplex/CHANGELOG.md +++ b/muxers/mplex/CHANGELOG.md @@ -1,6 +1,9 @@ # 0.33.0 [unreleased] - Update to `libp2p-core` `v0.33.0`. +- Remove `Substream` from the public API. See [PR 2648]. + +[PR 2648]: https://github.com/libp2p/rust-libp2p/pull/2648 # 0.32.0 [2022-02-22] diff --git a/muxers/mplex/benches/split_send_size.rs b/muxers/mplex/benches/split_send_size.rs index a9704b8e7677..d9d333f3ad8d 100644 --- a/muxers/mplex/benches/split_send_size.rs +++ b/muxers/mplex/benches/split_send_size.rs @@ -27,8 +27,7 @@ use futures::channel::oneshot; use futures::future::poll_fn; use futures::prelude::*; use libp2p_core::{ - identity, multiaddr::multiaddr, muxing, transport, upgrade, Multiaddr, PeerId, StreamMuxer, - Transport, + identity, multiaddr::multiaddr, muxing, transport, upgrade, Multiaddr, PeerId, Transport, }; use libp2p_mplex as mplex; use libp2p_plaintext::PlainText2Config; @@ -103,27 +102,24 @@ fn run(transport: &mut BenchTransport, payload: &Vec, listen_addr: &Multiadd addr_sender.take().unwrap().send(a).unwrap(); } transport::ListenerEvent::Upgrade { upgrade, .. } => { - let (_peer, conn) = upgrade.await.unwrap(); - match poll_fn(|cx| conn.poll_event(cx)).await { - Ok(muxing::StreamMuxerEvent::InboundSubstream(mut s)) => { - let mut buf = vec![0u8; payload_len]; - let mut off = 0; - loop { - // Read in typical chunk sizes of up to 8KiB. - let end = off + std::cmp::min(buf.len() - off, 8 * 1024); - let n = poll_fn(|cx| { - conn.read_substream(cx, &mut s, &mut buf[off..end]) - }) - .await - .unwrap(); - off += n; - if off == buf.len() { - return; - } - } + let (_peer, mut conn) = upgrade.await.unwrap(); + + let mut s = poll_fn(|cx| conn.poll(cx)) + .await + .expect("unexpected error") + .into_inbound_substream() + .expect("Unexpected muxer event"); + + let mut buf = vec![0u8; payload_len]; + let mut off = 0; + loop { + // Read in typical chunk sizes of up to 8KiB. + let end = off + std::cmp::min(buf.len() - off, 8 * 1024); + let n = s.read(&mut buf[off..end]).await.unwrap(); + off += n; + if off == buf.len() { + return; } - Ok(_) => panic!("Unexpected muxer event"), - Err(e) => panic!("Unexpected error: {:?}", e), } } _ => panic!("Unexpected listener event"), @@ -134,24 +130,16 @@ fn run(transport: &mut BenchTransport, payload: &Vec, listen_addr: &Multiadd // Spawn and block on the sender, i.e. until all data is sent. task::block_on(async move { let addr = addr_receiver.await.unwrap(); - let (_peer, conn) = transport.dial(addr).unwrap().await.unwrap(); - let mut handle = conn.open_outbound(); - let mut stream = poll_fn(|cx| conn.poll_outbound(cx, &mut handle)) + let (_peer, mut conn) = transport.dial(addr).unwrap().await.unwrap(); + conn.open_outbound(); + let (mut stream, _) = poll_fn(|cx| conn.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); - let mut off = 0; - loop { - let n = poll_fn(|cx| conn.write_substream(cx, &mut stream, &payload[off..])) - .await - .unwrap(); - off += n; - if off == payload.len() { - poll_fn(|cx| conn.flush_substream(cx, &mut stream)) - .await - .unwrap(); - return; - } - } + + stream.write_all(&payload).await.unwrap(); + stream.flush().await.unwrap(); }); // Wait for all data to be received. diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index cd0fce61f665..ae94176113f2 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -27,12 +27,16 @@ pub use config::{MaxBufferBehaviour, MplexConfig}; use bytes::Bytes; use codec::LocalStreamId; use futures::{future, prelude::*, ready}; +use libp2p_core::muxing::OutboundSubstreamId; +use libp2p_core::muxing::{AsyncReadWriteBox, OutboundSubstreams}; use libp2p_core::{ muxing::StreamMuxerEvent, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, StreamMuxer, }; use parking_lot::Mutex; +use std::pin::Pin; +use std::sync::Arc; use std::{cmp, iter, task::Context, task::Poll}; impl UpgradeInfo for MplexConfig { @@ -46,30 +50,38 @@ impl UpgradeInfo for MplexConfig { impl InboundUpgrade for MplexConfig where - C: AsyncRead + AsyncWrite + Unpin, + C: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - type Output = Multiplex; + type Output = Multiplex; type Error = io::Error; type Future = future::Ready>; fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { future::ready(Ok(Multiplex { - io: Mutex::new(io::Multiplexed::new(socket, self)), + io: Arc::new(Mutex::new(io::Multiplexed::new( + AsyncReadWriteBox::new(socket), + self, + ))), + outbound_substreams: OutboundSubstreams::default(), })) } } impl OutboundUpgrade for MplexConfig where - C: AsyncRead + AsyncWrite + Unpin, + C: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - type Output = Multiplex; + type Output = Multiplex; type Error = io::Error; type Future = future::Ready>; fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { future::ready(Ok(Multiplex { - io: Mutex::new(io::Multiplexed::new(socket, self)), + io: Arc::new(Mutex::new(io::Multiplexed::new( + AsyncReadWriteBox::new(socket), + self, + ))), + outbound_substreams: OutboundSubstreams::default(), })) } } @@ -78,116 +90,125 @@ where /// /// This implementation isn't capable of detecting when the underlying socket changes its address, /// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted. -pub struct Multiplex { - io: Mutex>, +pub struct Multiplex { + io: Arc>>, + outbound_substreams: OutboundSubstreams, } -impl StreamMuxer for Multiplex -where - C: AsyncRead + AsyncWrite + Unpin, -{ - type Substream = Substream; - type OutboundSubstream = OutboundSubstream; - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll>> { - let stream_id = ready!(self.io.lock().poll_next_stream(cx))?; - let stream = Substream::new(stream_id); - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream))) +impl Multiplex { + fn poll_inbound(&mut self, cx: &mut Context<'_>) -> Poll> { + let mplex_id = ready!(self.io.lock().poll_next_stream(cx))?; + let substream = AsyncReadWriteBox::new(Substream::new(mplex_id, self.io.clone())); + + Poll::Ready(Ok(substream)) } +} - fn open_outbound(&self) -> Self::OutboundSubstream { - OutboundSubstream {} +impl StreamMuxer for Multiplex { + fn open_outbound(&mut self) -> OutboundSubstreamId { + self.outbound_substreams.open_new() } - fn poll_outbound( - &self, - cx: &mut Context<'_>, - _: &mut Self::OutboundSubstream, - ) -> Poll> { - let stream_id = ready!(self.io.lock().poll_open_stream(cx))?; - Poll::Ready(Ok(Substream::new(stream_id))) + fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready((mplex_id, outbound_id)) = self + .outbound_substreams + .poll(|| Pin::new(&mut self.io.lock()).poll_open_stream(cx))? + { + let substream = AsyncReadWriteBox::new(Substream::new(mplex_id, self.io.clone())); + + return Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream( + substream, + outbound_id, + ))); + } + + let substream = ready!(self.poll_inbound(cx))?; + + Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + self.io.lock().poll_close(cx) } +} - fn destroy_outbound(&self, _substream: Self::OutboundSubstream) { - // Nothing to do, since `open_outbound` creates no new local state. +/// Active substream to the remote. +struct Substream { + /// The unique, local identifier of the substream. + id: LocalStreamId, + /// The current data frame the substream is reading from. + current_data: Bytes, + /// Shared reference to the actual muxer. + io: Arc>>, +} + +impl Substream { + fn new(id: LocalStreamId, io: Arc>>) -> Self { + Self { + id, + current_data: Bytes::default(), + io, + } } +} - fn read_substream( - &self, +impl AsyncRead for Substream { + fn poll_read( + self: Pin<&mut Self>, cx: &mut Context<'_>, - substream: &mut Self::Substream, buf: &mut [u8], - ) -> Poll> { + ) -> Poll> { + let this = self.get_mut(); + loop { // Try to read from the current (i.e. last received) frame. - if !substream.current_data.is_empty() { - let len = cmp::min(substream.current_data.len(), buf.len()); - buf[..len].copy_from_slice(&substream.current_data.split_to(len)); + if !this.current_data.is_empty() { + let len = cmp::min(this.current_data.len(), buf.len()); + buf[..len].copy_from_slice(&this.current_data.split_to(len)); return Poll::Ready(Ok(len)); } // Read the next data frame from the multiplexed stream. - match ready!(self.io.lock().poll_read_stream(cx, substream.id))? { + match ready!(this.io.lock().poll_read_stream(cx, this.id))? { Some(data) => { - substream.current_data = data; + this.current_data = data; } None => return Poll::Ready(Ok(0)), } } } +} - fn write_substream( - &self, +impl AsyncWrite for Substream { + fn poll_write( + self: Pin<&mut Self>, cx: &mut Context<'_>, - substream: &mut Self::Substream, buf: &[u8], - ) -> Poll> { - self.io.lock().poll_write_stream(cx, substream.id, buf) - } + ) -> Poll> { + let this = self.get_mut(); - fn flush_substream( - &self, - cx: &mut Context<'_>, - substream: &mut Self::Substream, - ) -> Poll> { - self.io.lock().poll_flush_stream(cx, substream.id) + this.io.lock().poll_write_stream(cx, this.id, buf) } - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - substream: &mut Self::Substream, - ) -> Poll> { - self.io.lock().poll_close_stream(cx, substream.id) - } + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); - fn destroy_substream(&self, sub: Self::Substream) { - self.io.lock().drop_stream(sub.id); + this.io.lock().poll_flush_stream(cx, this.id) } - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.io.lock().poll_close(cx) - } -} + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let mut io = this.io.lock(); -/// Active attempt to open an outbound substream. -pub struct OutboundSubstream {} + ready!(io.poll_flush_stream(cx, this.id))?; + ready!(io.poll_close_stream(cx, this.id))?; -/// Active substream to the remote. -pub struct Substream { - /// The unique, local identifier of the substream. - id: LocalStreamId, - /// The current data frame the substream is reading from. - current_data: Bytes, + Poll::Ready(Ok(())) + } } -impl Substream { - fn new(id: LocalStreamId) -> Self { - Self { - id, - current_data: Bytes::new(), - } +impl Drop for Substream { + fn drop(&mut self) { + self.io.lock().drop_stream(self.id); } } diff --git a/muxers/mplex/tests/async_write.rs b/muxers/mplex/tests/async_write.rs index 2069bf0b8f69..6ccb1dc22c6a 100644 --- a/muxers/mplex/tests/async_write.rs +++ b/muxers/mplex/tests/async_write.rs @@ -18,10 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::future::poll_fn; use futures::{channel::oneshot, prelude::*}; -use libp2p_core::{muxing, upgrade, Transport}; +use libp2p_core::{upgrade, StreamMuxer, Transport}; use libp2p_tcp::TcpConfig; -use std::sync::Arc; #[test] fn async_write() { @@ -49,7 +49,7 @@ fn async_write() { tx.send(addr).unwrap(); - let client = listener + let mut client = listener .next() .await .unwrap() @@ -60,8 +60,11 @@ fn async_write() { .await .unwrap(); - let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)) + client.open_outbound(); + let (mut outbound, _) = poll_fn(|cx| client.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); let mut buf = vec![0u8; 11]; @@ -74,9 +77,9 @@ fn async_write() { let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); - let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); + let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); let mut inbound = loop { - if let Some(s) = muxing::event_from_ref_and_wrap(client.clone()) + if let Some(s) = poll_fn(|cx| client.poll(cx)) .await .unwrap() .into_inbound_substream() diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index f4c7fd23dc1b..b4d23f2eaf5f 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -18,10 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::future::poll_fn; use futures::{channel::oneshot, prelude::*}; -use libp2p_core::{muxing, upgrade, Transport}; +use libp2p_core::{upgrade, StreamMuxer, Transport}; use libp2p_tcp::TcpConfig; -use std::sync::Arc; #[test] fn client_to_server_outbound() { @@ -49,7 +49,7 @@ fn client_to_server_outbound() { tx.send(addr).unwrap(); - let client = listener + let mut client = listener .next() .await .unwrap() @@ -60,12 +60,15 @@ fn client_to_server_outbound() { .await .unwrap(); - let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)) + client.open_outbound(); + let (mut outbound, _) = poll_fn(|cx| client.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); - let mut buf = Vec::new(); - outbound.read_to_end(&mut buf).await.unwrap(); + let mut buf = vec![0u8; 11]; + outbound.read_exact(&mut buf).await.unwrap(); assert_eq!(buf, b"hello world"); }); @@ -74,9 +77,9 @@ fn client_to_server_outbound() { let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); - let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); + let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); let mut inbound = loop { - if let Some(s) = muxing::event_from_ref_and_wrap(client.clone()) + if let Some(s) = poll_fn(|cx| client.poll(cx)) .await .unwrap() .into_inbound_substream() @@ -117,21 +120,19 @@ fn client_to_server_inbound() { tx.send(addr).unwrap(); - let client = Arc::new( - listener - .next() - .await - .unwrap() - .unwrap() - .into_upgrade() - .unwrap() - .0 - .await - .unwrap(), - ); + let mut client = listener + .next() + .await + .unwrap() + .unwrap() + .into_upgrade() + .unwrap() + .0 + .await + .unwrap(); let mut inbound = loop { - if let Some(s) = muxing::event_from_ref_and_wrap(client.clone()) + if let Some(s) = poll_fn(|cx| client.poll(cx)) .await .unwrap() .into_inbound_substream() @@ -150,9 +151,13 @@ fn client_to_server_inbound() { let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); - let client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); - let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)) + let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); + + client.open_outbound(); + let (mut outbound, _) = poll_fn(|cx| client.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); outbound.write_all(b"hello world").await.unwrap(); outbound.close().await.unwrap(); @@ -185,7 +190,7 @@ fn protocol_not_match() { tx.send(addr).unwrap(); - let client = listener + let mut client = listener .next() .await .unwrap() @@ -196,8 +201,11 @@ fn protocol_not_match() { .await .unwrap(); - let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)) + client.open_outbound(); + let (mut outbound, _) = poll_fn(|cx| client.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); let mut buf = vec![0u8; 11]; diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index e49a88458851..f4b1677f2840 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -1,6 +1,9 @@ # 0.37.0 [unreleased] - Update to `libp2p-core` `v0.33.0`. +- Remove `YamuxResult` and `YamuxError` from the public API. See [PR 2648]. + +[PR 2648]: https://github.com/libp2p/rust-libp2p/pull/2648 # 0.36.0 [2022-02-22] diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index f473ad567860..7800e5f3e59b 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -27,9 +27,10 @@ use futures::{ ready, stream::{BoxStream, LocalBoxStream}, }; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; +use libp2p_core::muxing::{ + AsyncReadWriteBox, OutboundSubstreamId, OutboundSubstreams, StreamMuxer, StreamMuxerEvent, +}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use parking_lot::Mutex; use std::{ fmt, io, iter, pin::Pin, @@ -37,7 +38,14 @@ use std::{ }; /// A Yamux connection. -pub struct Yamux(Mutex>); +pub struct Yamux { + /// The [`futures::stream::Stream`] of incoming substreams. + incoming: S, + /// Handle to control the connection. + control: yamux::Control, + /// Controls the state handling around opening new outbound substreams. + outbound_substreams: OutboundSubstreams, +} impl fmt::Debug for Yamux { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -45,13 +53,6 @@ impl fmt::Debug for Yamux { } } -struct Inner { - /// The [`futures::stream::Stream`] of incoming substreams. - incoming: S, - /// Handle to control the connection. - control: yamux::Control, -} - /// A token to poll for an outbound substream. #[derive(Debug)] pub struct OpenSubstreamToken(()); @@ -64,15 +65,15 @@ where fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { let conn = yamux::Connection::new(io, cfg, mode); let ctrl = conn.control(); - let inner = Inner { + + Yamux { incoming: Incoming { stream: yamux::into_stream(conn).err_into().boxed(), _marker: std::marker::PhantomData, }, control: ctrl, - }; - - Yamux(Mutex::new(inner)) + outbound_substreams: OutboundSubstreams::default(), + } } } @@ -84,15 +85,15 @@ where fn local(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { let conn = yamux::Connection::new(io, cfg, mode); let ctrl = conn.control(); - let inner = Inner { + + Yamux { incoming: LocalIncoming { stream: yamux::into_stream(conn).err_into().boxed_local(), _marker: std::marker::PhantomData, }, control: ctrl, - }; - - Yamux(Mutex::new(inner)) + outbound_substreams: OutboundSubstreams::default(), + } } } @@ -101,80 +102,32 @@ impl StreamMuxer for Yamux where S: Stream> + Unpin, { - type Substream = yamux::Stream; - type OutboundSubstream = OpenSubstreamToken; - - fn poll_event( - &self, - c: &mut Context<'_>, - ) -> Poll>> { - let mut inner = self.0.lock(); - match ready!(inner.incoming.poll_next_unpin(c)) { - Some(Ok(s)) => Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))), - Some(Err(e)) => Poll::Ready(Err(to_io_error(e))), - None => Poll::Ready(Err(to_io_error(yamux::ConnectionError::Closed))), - } + fn open_outbound(&mut self) -> OutboundSubstreamId { + self.outbound_substreams.open_new() } - fn open_outbound(&self) -> Self::OutboundSubstream { - OpenSubstreamToken(()) - } - - fn poll_outbound( - &self, - c: &mut Context<'_>, - _: &mut OpenSubstreamToken, - ) -> Poll> { - let mut inner = self.0.lock(); - Pin::new(&mut inner.control) - .poll_open_stream(c) - .map_err(to_io_error) - } - - fn destroy_outbound(&self, _: Self::OutboundSubstream) { - self.0.lock().control.abort_open_stream() - } - - fn read_substream( - &self, - c: &mut Context<'_>, - s: &mut Self::Substream, - b: &mut [u8], - ) -> Poll> { - Pin::new(s).poll_read(c, b) - } + fn poll(&mut self, c: &mut Context<'_>) -> Poll> { + if let Poll::Ready((stream, id)) = self + .outbound_substreams + .poll(|| Pin::new(&mut self.control).poll_open_stream(c)) + .map_err(to_io_error)? + { + let substream = AsyncReadWriteBox::new(stream); - fn write_substream( - &self, - c: &mut Context<'_>, - s: &mut Self::Substream, - b: &[u8], - ) -> Poll> { - Pin::new(s).poll_write(c, b) - } + return Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(substream, id))); + } - fn flush_substream( - &self, - c: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - Pin::new(s).poll_flush(c) - } + let event = ready!(self.incoming.poll_next_unpin(c)) + .transpose() + .map_err(to_io_error)? + .map(|stream| StreamMuxerEvent::InboundSubstream(AsyncReadWriteBox::new(stream))) + .ok_or_else(|| to_io_error(yamux::ConnectionError::Closed))?; - fn shutdown_substream( - &self, - c: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - Pin::new(s).poll_close(c) + Poll::Ready(Ok(event)) } - fn destroy_substream(&self, _: Self::Substream) {} - - fn poll_close(&self, c: &mut Context<'_>) -> Poll> { - let mut inner = self.0.lock(); - - if let Poll::Ready(()) = Pin::new(&mut inner.control) + fn poll_close(&mut self, c: &mut Context<'_>) -> Poll> { + if let Poll::Ready(()) = Pin::new(&mut self.control) .poll_close(c) .map_err(to_io_error)? { @@ -182,7 +135,7 @@ where } while let Poll::Ready(_inbound_stream) = - inner.incoming.poll_next_unpin(c).map_err(to_io_error)? + self.incoming.poll_next_unpin(c).map_err(to_io_error)? { match _inbound_stream { Some(_) => {} // drop inbound stream diff --git a/protocols/rendezvous/tests/harness.rs b/protocols/rendezvous/tests/harness.rs index 8786d61c2f15..555a5476bab9 100644 --- a/protocols/rendezvous/tests/harness.rs +++ b/protocols/rendezvous/tests/harness.rs @@ -22,7 +22,6 @@ use async_trait::async_trait; use futures::stream::FusedStream; use futures::StreamExt; use futures::{future, Stream}; -use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::upgrade::Version; use libp2p::core::transport::MemoryTransport; use libp2p::core::upgrade::SelectUpgrade; diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 42c29239cdc6..d7efe653bb77 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -32,17 +32,19 @@ pub use error::{ pub use listeners::{ListenersEvent, ListenersStream}; pub use pool::{ConnectionCounters, ConnectionLimits}; pub use pool::{EstablishedConnection, PendingConnection}; -pub use substream::{Close, Substream, SubstreamEndpoint}; +pub use substream::SubstreamEndpoint; use crate::handler::ConnectionHandler; +use futures::future::poll_fn; use handler_wrapper::HandlerWrapper; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::muxing::{OutboundSubstreamId, StreamMuxerBox, StreamMuxerEvent}; use libp2p_core::upgrade; use libp2p_core::PeerId; -use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll}; -use substream::{Muxing, SubstreamEvent}; +use std::collections::HashMap; +use std::future::Future; +use std::{error::Error, fmt, io, pin::Pin, task::Context, task::Poll}; /// Information about a successfully established connection. #[derive(Debug, Clone, PartialEq, Eq)] @@ -68,7 +70,11 @@ where THandler: ConnectionHandler, { /// Node that handles the muxing. - muxing: substream::Muxing>, + muxing: StreamMuxerBox, + + /// Temporary storage of OutboundOpenInfo whilst the substreams are opening. + open_info: HashMap>, + /// Handler that processes substreams. handler: HandlerWrapper, } @@ -76,10 +82,11 @@ where impl fmt::Debug for Connection where THandler: ConnectionHandler + fmt::Debug, + handler_wrapper::OutboundOpenInfo: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection") - .field("muxing", &self.muxing) + .field("open_info", &self.open_info) .field("handler", &self.handler) .finish() } @@ -100,7 +107,8 @@ where ) -> Self { let wrapped_handler = HandlerWrapper::new(handler, substream_upgrade_protocol_override); Connection { - muxing: Muxing::new(muxer), + muxing: muxer, + open_info: HashMap::default(), handler: wrapped_handler, } } @@ -112,10 +120,10 @@ where /// Begins an orderly shutdown of the connection, returning the connection /// handler and a `Future` that resolves when connection shutdown is complete. - pub fn close(self) -> (THandler, Close) { + pub fn close(mut self) -> (THandler, impl Future>) { ( self.handler.into_connection_handler(), - self.muxing.close().0, + poll_fn(move |cx| self.muxing.poll_close(cx)), ) } @@ -130,7 +138,8 @@ where match self.handler.poll(cx) { Poll::Pending => {} Poll::Ready(Ok(handler_wrapper::Event::OutboundSubstreamRequest(user_data))) => { - self.muxing.open_substream(user_data); + let id = self.muxing.open_outbound(); + self.open_info.insert(id, user_data); continue; } Poll::Ready(Ok(handler_wrapper::Event::Custom(event))) => { @@ -143,20 +152,19 @@ where // of new substreams. match self.muxing.poll(cx) { Poll::Pending => {} - Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => { + Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => { self.handler .inject_substream(substream, SubstreamEndpoint::Listener); continue; } - Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { - user_data, - substream, - })) => { + Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(substream, id))) => { + let user_data = self.open_info.remove(&id).expect("unknown substream id"); let endpoint = SubstreamEndpoint::Dialer(user_data); + self.handler.inject_substream(substream, endpoint); continue; } - Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => { + Poll::Ready(Ok(StreamMuxerEvent::AddressChange(address))) => { self.handler.inject_address_change(&address); return Poll::Ready(Ok(Event::AddressChange(address))); } diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index b10077d41b34..3bd8c66a1ac9 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::connection::{Substream, SubstreamEndpoint}; +use crate::connection::SubstreamEndpoint; use crate::handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, }; @@ -28,8 +28,8 @@ use futures::prelude::*; use futures::stream::FuturesUnordered; use futures_timer::Delay; use instant::Instant; +use libp2p_core::muxing::AsyncReadWriteBox; use libp2p_core::{ - muxing::StreamMuxerBox, upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply, UpgradeError}, Multiaddr, }; @@ -53,7 +53,7 @@ where SubstreamUpgrade< TConnectionHandler::InboundOpenInfo, InboundUpgradeApply< - Substream, + AsyncReadWriteBox, SendWrapper, >, >, @@ -63,7 +63,7 @@ where SubstreamUpgrade< TConnectionHandler::OutboundOpenInfo, OutboundUpgradeApply< - Substream, + AsyncReadWriteBox, SendWrapper, >, >, @@ -236,7 +236,7 @@ where { pub fn inject_substream( &mut self, - substream: Substream, + substream: AsyncReadWriteBox, // The first element of the tuple is the unique upgrade identifier // (see `unique_dial_upgrade_id`). endpoint: SubstreamEndpoint>, diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 8186154c9b39..e16a043189c6 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -38,7 +38,7 @@ use futures::{ stream::FuturesUnordered, }; use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint}; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox}; +use libp2p_core::muxing::StreamMuxerBox; use std::{ collections::{hash_map, HashMap}, convert::TryFrom as _, @@ -598,7 +598,7 @@ where match event { task::PendingConnectionEvent::ConnectionEstablished { id, - output: (obtained_peer_id, muxer), + output: (obtained_peer_id, mut muxer), outgoing, } => { let PendingConnectionInfo { diff --git a/swarm/src/connection/substream.rs b/swarm/src/connection/substream.rs index b7e3d3bad872..f543a94f0b1c 100644 --- a/swarm/src/connection/substream.rs +++ b/swarm/src/connection/substream.rs @@ -18,240 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::prelude::*; -use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::{substream_from_ref, StreamMuxer, StreamMuxerEvent, SubstreamRef}; -use smallvec::SmallVec; -use std::sync::Arc; -use std::{fmt, io::Error as IoError, pin::Pin, task::Context, task::Poll}; - /// Endpoint for a received substream. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum SubstreamEndpoint { Dialer(TDialInfo), Listener, } - -/// Implementation of `Stream` that handles substream multiplexing. -/// -/// The stream will receive substreams and can be used to open new outgoing substreams. Destroying -/// the `Muxing` will **not** close the existing substreams. -/// -/// The stream will close once both the inbound and outbound channels are closed, and no more -/// outbound substream attempt is pending. -pub struct Muxing -where - TMuxer: StreamMuxer, -{ - /// The muxer used to manage substreams. - inner: Arc, - /// List of substreams we are currently opening. - outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>, -} - -/// Future that signals the remote that we have closed the connection. -pub struct Close { - /// Muxer to close. - muxer: Arc, -} - -/// A successfully opened substream. -pub type Substream = SubstreamRef>; - -/// Event that can happen on the `Muxing`. -pub enum SubstreamEvent -where - TMuxer: StreamMuxer, -{ - /// A new inbound substream arrived. - InboundSubstream { - /// The newly-opened substream. Will return EOF of an error if the `Muxing` is - /// destroyed or `close_graceful` is called. - substream: Substream, - }, - - /// An outbound substream has successfully been opened. - OutboundSubstream { - /// User data that has been passed to the `open_substream` method. - user_data: TUserData, - /// The newly-opened substream. Will return EOF of an error if the `Muxing` is - /// destroyed or `close_graceful` is called. - substream: Substream, - }, - - /// Address to the remote has changed. The previous one is now obsolete. - /// - /// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes - /// > can change their IP address while retaining the same QUIC connection. - AddressChange(Multiaddr), -} - -/// Identifier for a substream being opened. -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct OutboundSubstreamId(usize); - -impl Muxing -where - TMuxer: StreamMuxer, -{ - /// Creates a new node events stream. - pub fn new(muxer: TMuxer) -> Self { - Muxing { - inner: Arc::new(muxer), - outbound_substreams: SmallVec::new(), - } - } - - /// Starts the process of opening a new outbound substream. - /// - /// After calling this method, polling the stream should eventually produce either an - /// `OutboundSubstream` event or an `OutboundClosed` event containing the user data that has - /// been passed to this method. - pub fn open_substream(&mut self, user_data: TUserData) { - let raw = self.inner.open_outbound(); - self.outbound_substreams.push((user_data, raw)); - } - - /// Destroys the node stream and returns all the pending outbound substreams, plus an object - /// that signals the remote that we shut down the connection. - #[must_use] - pub fn close(mut self) -> (Close, Vec) { - let substreams = self.cancel_outgoing(); - let close = Close { - muxer: self.inner.clone(), - }; - (close, substreams) - } - - /// Destroys all outbound streams and returns the corresponding user data. - pub fn cancel_outgoing(&mut self) -> Vec { - let mut out = Vec::with_capacity(self.outbound_substreams.len()); - for (user_data, outbound) in self.outbound_substreams.drain(..) { - out.push(user_data); - self.inner.destroy_outbound(outbound); - } - out - } - - /// Provides an API similar to `Future`. - pub fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll, IoError>> { - // Polling inbound substream. - match self.inner.poll_event(cx) { - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => { - let substream = substream_from_ref(self.inner.clone(), substream); - return Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })); - } - Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => { - return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr))) - } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => {} - } - - // Polling outbound substreams. - // We remove each element from `outbound_substreams` one by one and add them back. - for n in (0..self.outbound_substreams.len()).rev() { - let (user_data, mut outbound) = self.outbound_substreams.swap_remove(n); - match self.inner.poll_outbound(cx, &mut outbound) { - Poll::Ready(Ok(substream)) => { - let substream = substream_from_ref(self.inner.clone(), substream); - self.inner.destroy_outbound(outbound); - return Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { - user_data, - substream, - })); - } - Poll::Pending => { - self.outbound_substreams.push((user_data, outbound)); - } - Poll::Ready(Err(err)) => { - self.inner.destroy_outbound(outbound); - return Poll::Ready(Err(err)); - } - } - } - - // Nothing happened. Register our task to be notified and return. - Poll::Pending - } -} - -impl fmt::Debug for Muxing -where - TMuxer: StreamMuxer, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Muxing") - .field("outbound_substreams", &self.outbound_substreams.len()) - .finish() - } -} - -impl Drop for Muxing -where - TMuxer: StreamMuxer, -{ - fn drop(&mut self) { - // The substreams that were produced will continue to work, as the muxer is held in an Arc. - // However we will no longer process any further inbound or outbound substream, and we - // therefore close everything. - for (_, outbound) in self.outbound_substreams.drain(..) { - self.inner.destroy_outbound(outbound); - } - } -} - -impl Future for Close -where - TMuxer: StreamMuxer, -{ - type Output = Result<(), IoError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.muxer.poll_close(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(err)) => Poll::Ready(Err(err)), - } - } -} - -impl fmt::Debug for Close -where - TMuxer: StreamMuxer, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Close").finish() - } -} - -impl fmt::Debug for SubstreamEvent -where - TMuxer: StreamMuxer, - TMuxer::Substream: fmt::Debug, - TUserData: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SubstreamEvent::InboundSubstream { substream } => f - .debug_struct("SubstreamEvent::OutboundClosed") - .field("substream", substream) - .finish(), - SubstreamEvent::OutboundSubstream { - user_data, - substream, - } => f - .debug_struct("SubstreamEvent::OutboundSubstream") - .field("user_data", user_data) - .field("substream", substream) - .finish(), - SubstreamEvent::AddressChange(address) => f - .debug_struct("SubstreamEvent::AddressChange") - .field("address", address) - .finish(), - } - } -} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 461e8c7d13f9..3bee2056ba71 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -79,11 +79,12 @@ pub use handler::{ pub use registry::{AddAddressResult, AddressRecord, AddressScore}; use connection::pool::{Pool, PoolConfig, PoolEvent}; -use connection::{EstablishedConnection, IncomingInfo, ListenersEvent, ListenersStream, Substream}; +use connection::{EstablishedConnection, IncomingInfo, ListenersEvent, ListenersStream}; use dial_opts::{DialOpts, PeerCondition}; use either::Either; use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream}; use libp2p_core::connection::{ConnectionId, PendingPoint}; +use libp2p_core::muxing::AsyncReadWriteBox; use libp2p_core::{ connection::{ConnectedPoint, ListenerId}, multiaddr::Protocol, @@ -110,7 +111,7 @@ use upgrade::UpgradeInfoSend as _; /// /// Implements the [`AsyncRead`](futures::io::AsyncRead) and /// [`AsyncWrite`](futures::io::AsyncWrite) traits. -pub type NegotiatedSubstream = Negotiated>; +pub type NegotiatedSubstream = Negotiated; /// Event generated by the [`NetworkBehaviour`] that the swarm will report back. type TBehaviourOutEvent = ::OutEvent;