diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index bebe025d468..da88ce9aa55 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -4,12 +4,13 @@ - Remove `StreamMuxer::flush_all`. See [PR 2669]. - Rename `StreamMuxer::close` to `StreamMuxer::poll_close`. See [PR 2666]. - Remove deprecated function `StreamMuxer::is_remote_acknowledged`. See [PR 2665]. +- Re-design `StreamMuxer` trait to take `&mut self` and remove all associated types but `Substream`. See [PR 2648]. [PR 2529]: https://github.com/libp2p/rust-libp2p/pull/2529 [PR 2666]: https://github.com/libp2p/rust-libp2p/pull/2666 [PR 2665]: https://github.com/libp2p/rust-libp2p/pull/2665 [PR 2669]: https://github.com/libp2p/rust-libp2p/pull/2669 - +[PR 2648]: https://github.com/libp2p/rust-libp2p/pull/2648 # 0.32.1 diff --git a/core/src/either.rs b/core/src/either.rs index e1c5316fd6a..1cbf72b5193 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - muxing::{StreamMuxer, StreamMuxerEvent}, + muxing::{OutboundSubstreamId, StreamMuxer, StreamMuxerEvent}, transport::{ListenerEvent, Transport, TransportError}, Multiaddr, ProtocolName, }; @@ -202,165 +202,36 @@ where B: StreamMuxer, { type Substream = EitherOutput; - type OutboundSubstream = EitherOutbound; - type Error = io::Error; - fn poll_event( - &self, + fn poll( + &mut self, cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - match self { - EitherOutput::First(inner) => inner.poll_event(cx).map(|result| { - result.map_err(|e| e.into()).map(|event| match event { - StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), - StreamMuxerEvent::InboundSubstream(substream) => { - StreamMuxerEvent::InboundSubstream(EitherOutput::First(substream)) - } - }) - }), - EitherOutput::Second(inner) => inner.poll_event(cx).map(|result| { - result.map_err(|e| e.into()).map(|event| match event { - StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), - StreamMuxerEvent::InboundSubstream(substream) => { - StreamMuxerEvent::InboundSubstream(EitherOutput::Second(substream)) - } - }) - }), - } - } - - fn open_outbound(&self) -> Self::OutboundSubstream { + ) -> Poll>> { match self { - EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()), - EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()), - } - } - - fn poll_outbound( - &self, - cx: &mut Context<'_>, - substream: &mut Self::OutboundSubstream, - ) -> Poll> { - match (self, substream) { - (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner - .poll_outbound(cx, substream) - .map(|p| p.map(EitherOutput::First)) - .map_err(|e| e.into()), - (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner - .poll_outbound(cx, substream) - .map(|p| p.map(EitherOutput::Second)) - .map_err(|e| e.into()), - _ => panic!("Wrong API usage"), - } - } - - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - match self { - EitherOutput::First(inner) => match substream { - EitherOutbound::A(substream) => inner.destroy_outbound(substream), - _ => panic!("Wrong API usage"), - }, - EitherOutput::Second(inner) => match substream { - EitherOutbound::B(substream) => inner.destroy_outbound(substream), - _ => panic!("Wrong API usage"), - }, - } - } - - fn read_substream( - &self, - cx: &mut Context<'_>, - sub: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll> { - match (self, sub) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.read_substream(cx, sub, buf).map_err(|e| e.into()) - } - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.read_substream(cx, sub, buf).map_err(|e| e.into()) - } - _ => panic!("Wrong API usage"), - } - } - - fn write_substream( - &self, - cx: &mut Context<'_>, - sub: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - match (self, sub) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.write_substream(cx, sub, buf).map_err(|e| e.into()) - } - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.write_substream(cx, sub, buf).map_err(|e| e.into()) - } - _ => panic!("Wrong API usage"), - } - } - - fn flush_substream( - &self, - cx: &mut Context<'_>, - sub: &mut Self::Substream, - ) -> Poll> { - match (self, sub) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.flush_substream(cx, sub).map_err(|e| e.into()) - } - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.flush_substream(cx, sub).map_err(|e| e.into()) + EitherOutput::First(inner) => { + inner.poll(cx).map_ok(|e| e.map_stream(EitherOutput::First)) } - _ => panic!("Wrong API usage"), + EitherOutput::Second(inner) => inner + .poll(cx) + .map_ok(|e| e.map_stream(EitherOutput::Second)), } } - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - sub: &mut Self::Substream, - ) -> Poll> { - match (self, sub) { - (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.shutdown_substream(cx, sub).map_err(|e| e.into()) - } - (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.shutdown_substream(cx, sub).map_err(|e| e.into()) - } - _ => panic!("Wrong API usage"), - } - } - - fn destroy_substream(&self, substream: Self::Substream) { + fn open_outbound(&mut self) -> OutboundSubstreamId { match self { - EitherOutput::First(inner) => match substream { - EitherOutput::First(substream) => inner.destroy_substream(substream), - _ => panic!("Wrong API usage"), - }, - EitherOutput::Second(inner) => match substream { - EitherOutput::Second(substream) => inner.destroy_substream(substream), - _ => panic!("Wrong API usage"), - }, + EitherOutput::First(inner) => inner.open_outbound(), + EitherOutput::Second(inner) => inner.open_outbound(), } } - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { match self { - EitherOutput::First(inner) => inner.poll_close(cx).map_err(|e| e.into()), - EitherOutput::Second(inner) => inner.poll_close(cx).map_err(|e| e.into()), + EitherOutput::First(inner) => inner.poll_close(cx), + EitherOutput::Second(inner) => inner.poll_close(cx), } } } -#[derive(Debug, Copy, Clone)] -#[must_use = "futures do nothing unless polled"] -pub enum EitherOutbound { - A(A::OutboundSubstream), - B(B::OutboundSubstream), -} - /// Implements `Stream` and dispatches all method calls to either `First` or `Second`. #[pin_project(project = EitherListenStreamProj)] #[derive(Debug, Copy, Clone)] diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 969c56c782e..847a193b832 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -21,8 +21,7 @@ //! Muxing is the process of splitting a connection into multiple substreams. //! //! The main item of this module is the `StreamMuxer` trait. An implementation of `StreamMuxer` -//! has ownership of a connection, lets you open and close substreams, and read/write data -//! on open substreams. +//! has ownership of a connection, lets you open and close substreams. //! //! > **Note**: You normally don't need to use the methods of the `StreamMuxer` directly, as this //! > is managed by the library's internals. @@ -51,166 +50,52 @@ //! The upgrade process will take ownership of the connection, which makes it possible for the //! implementation of `StreamMuxer` to control everything that happens on the wire. -use futures::{future, prelude::*, task::Context, task::Poll}; +use futures::{task::Context, task::Poll}; use multiaddr::Multiaddr; -use std::{fmt, io, ops::Deref, pin::Pin}; +use std::collections::VecDeque; +use std::io; pub use self::boxed::StreamMuxerBox; +pub use self::boxed::SubstreamBox; pub use self::singleton::SingletonMuxer; mod boxed; mod singleton; -/// Implemented on objects that can open and manage substreams. -/// -/// The state of a muxer, as exposed by this API, is the following: -/// -/// - A connection to the remote. The `poll_event`, `flush_all` and `close` methods operate -/// on this. -/// - A list of substreams that are open. The `poll_outbound`, `read_substream`, `write_substream`, -/// `flush_substream`, `shutdown_substream` and `destroy_substream` methods allow controlling -/// these entries. -/// - A list of outbound substreams being opened. The `open_outbound`, `poll_outbound` and -/// `destroy_outbound` methods allow controlling these entries. +/// Provides multiplexing for a connection by allowing users to open substreams. /// +/// A [`StreamMuxer`] is a stateful object and needs to be continuously polled to make progress. pub trait StreamMuxer { /// Type of the object that represents the raw substream where data can be read and written. type Substream; - /// Future that will be resolved when the outgoing substream is open. - type OutboundSubstream; - - /// Error type of the muxer - type Error: Into; - - /// Polls for a connection-wide event. - /// - /// This function behaves the same as a `Stream`. - /// - /// If `Pending` is returned, then the current task will be notified once the muxer - /// is ready to be polled, similar to the API of `Stream::poll()`. - /// Only the latest task that was used to call this method may be notified. + /// Polls the [`StreamMuxer`] to make progress. /// /// It is permissible and common to use this method to perform background /// work, such as processing incoming packets and polling timers. /// /// An error can be generated if the connection has been closed. - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>>; - - /// Opens a new outgoing substream, and produces the equivalent to a future that will be - /// resolved when it becomes available. - /// - /// The API of `OutboundSubstream` is totally opaque, and the object can only be interfaced - /// through the methods on the `StreamMuxer` trait. - fn open_outbound(&self) -> Self::OutboundSubstream; - - /// Polls the outbound substream. - /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be polled, similar to the API of `Future::poll()`. - /// However, for each individual outbound substream, only the latest task that was used to - /// call this method may be notified. - /// - /// May panic or produce an undefined result if an earlier polling of the same substream - /// returned `Ready` or `Err`. - fn poll_outbound( - &self, - cx: &mut Context<'_>, - s: &mut Self::OutboundSubstream, - ) -> Poll>; - - /// Destroys an outbound substream future. Use this after the outbound substream has finished, - /// or if you want to interrupt it. - fn destroy_outbound(&self, s: Self::OutboundSubstream); + fn poll(&mut self, cx: &mut Context<'_>) + -> Poll>>; - /// Reads data from a substream. The behaviour is the same as `futures::AsyncRead::poll_read`. - /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be read. However, for each individual substream, only the latest task that - /// was used to call this method may be notified. - /// - /// If `Async::Ready(0)` is returned, the substream has been closed by the remote and should - /// no longer be read afterwards. - /// - /// An error can be generated if the connection has been closed, or if a protocol misbehaviour - /// happened. - fn read_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll>; - - /// Write data to a substream. The behaviour is the same as `futures::AsyncWrite::poll_write`. - /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be read. For each individual substream, only the latest task that was used to - /// call this method may be notified. - /// - /// Calling `write_substream` does not guarantee that data will arrive to the remote. To - /// ensure that, you should call `flush_substream`. + /// Instruct this [`StreamMuxer`] to open a new outbound substream. /// - /// It is incorrect to call this method on a substream if you called `shutdown_substream` on - /// this substream earlier. - fn write_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &[u8], - ) -> Poll>; + /// This function returns instantly with a new ID but the substream has not been opened at that + /// point! When the substream is ready to be used, you will receive a + /// [`StreamMuxerEvent::OutboundSubstream`] event with the same ID as returned here. + fn open_outbound(&mut self) -> OutboundSubstreamId; - /// Flushes a substream. The behaviour is the same as `futures::AsyncWrite::poll_flush`. - /// - /// After this method has been called, data written earlier on the substream is guaranteed to - /// be received by the remote. - /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be read. For each individual substream, only the latest task that was used to - /// call this method may be notified. - /// - /// > **Note**: This method may be implemented as a call to `flush_all`. - fn flush_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll>; - - /// Attempts to shut down the writing side of a substream. The behaviour is similar to - /// `AsyncWrite::poll_close`. - /// - /// Contrary to `AsyncWrite::poll_close`, shutting down a substream does not imply - /// `flush_substream`. If you want to make sure that the remote is immediately informed about - /// the shutdown, use `flush_substream` or `flush_all`. - /// - /// After this method has been called, you should no longer attempt to write to this substream. - /// - /// An error can be generated if the connection has been closed, or if a protocol misbehaviour - /// happened. - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll>; - - /// Destroys a substream. - fn destroy_substream(&self, s: Self::Substream); - - /// Closes this `StreamMuxer`. + /// Closes this [`StreamMuxer`]. /// /// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless. All /// subsequent reads must return either `EOF` or an error. All subsequent writes, shutdowns, /// or polls must generate an error or be ignored. /// - /// Calling this method implies `flush_all`. - /// /// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so /// > that the remote is properly informed of the shutdown. However, apart from /// > properly informing the remote, there is no difference between this and /// > immediately dropping the muxer. - fn poll_close(&self, cx: &mut Context<'_>) -> Poll>; + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll>; } /// Event about a connection, reported by an implementation of [`StreamMuxer`]. @@ -219,6 +104,10 @@ pub enum StreamMuxerEvent { /// Remote has opened a new substream. Contains the substream in question. InboundSubstream(T), + /// We have opened a new substream. Contains the substream and the ID returned + /// from [`StreamMuxer::open_outbound`]. + OutboundSubstream(T, OutboundSubstreamId), + /// Address to the remote has changed. The previous one is now obsolete. /// /// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes @@ -236,253 +125,82 @@ impl StreamMuxerEvent { None } } -} -/// Polls for an event from the muxer and, if an inbound substream, wraps this substream in an -/// object that implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`. -pub fn event_from_ref_and_wrap

( - muxer: P, -) -> impl Future>, ::Error>> -where - P: Deref + Clone, - P::Target: StreamMuxer, -{ - let muxer2 = muxer.clone(); - future::poll_fn(move |cx| muxer.poll_event(cx)).map_ok(|event| match event { - StreamMuxerEvent::InboundSubstream(substream) => { - StreamMuxerEvent::InboundSubstream(substream_from_ref(muxer2, substream)) + /// If `self` is a [`StreamMuxerEvent::OutboundSubstream`], returns the content. Otherwise + /// returns `None`. + pub fn into_outbound_substream(self) -> Option<(T, OutboundSubstreamId)> { + if let StreamMuxerEvent::OutboundSubstream(s, id) = self { + Some((s, id)) + } else { + None } - StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), - }) -} - -/// Same as `outbound_from_ref`, but wraps the output in an object that -/// implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`. -pub fn outbound_from_ref_and_wrap

(muxer: P) -> OutboundSubstreamRefWrapFuture

-where - P: Deref + Clone, - P::Target: StreamMuxer, -{ - let inner = outbound_from_ref(muxer); - OutboundSubstreamRefWrapFuture { inner } -} - -/// Future returned by `outbound_from_ref_and_wrap`. -pub struct OutboundSubstreamRefWrapFuture

-where - P: Deref + Clone, - P::Target: StreamMuxer, -{ - inner: OutboundSubstreamRefFuture

, -} - -impl

Future for OutboundSubstreamRefWrapFuture

-where - P: Deref + Clone, - P::Target: StreamMuxer, -{ - type Output = Result, ::Error>; + } - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Future::poll(Pin::new(&mut self.inner), cx) { - Poll::Ready(Ok(substream)) => { - let out = substream_from_ref(self.inner.muxer.clone(), substream); - Poll::Ready(Ok(out)) + /// Map the stream within [`StreamMuxerEvent::InboundSubstream`] and + /// [`StreamMuxerEvent::OutboundSubstream`] to a new type. + pub fn map_stream(self, map: impl FnOnce(T) -> O) -> StreamMuxerEvent { + match self { + StreamMuxerEvent::InboundSubstream(stream) => { + StreamMuxerEvent::InboundSubstream(map(stream)) } - Poll::Pending => Poll::Pending, - Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + StreamMuxerEvent::OutboundSubstream(stream, id) => { + StreamMuxerEvent::OutboundSubstream(map(stream), id) + } + StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), } } } -/// Builds a new future for an outbound substream, where the muxer is a reference. -pub fn outbound_from_ref

(muxer: P) -> OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ - let outbound = muxer.open_outbound(); - OutboundSubstreamRefFuture { - muxer, - outbound: Some(outbound), - } -} - -/// Future returned by `outbound_from_ref`. -pub struct OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ - muxer: P, - outbound: Option<::OutboundSubstream>, -} - -impl

Unpin for OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ -} - -impl

Future for OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ - type Output = Result<::Substream, ::Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; - this.muxer - .poll_outbound(cx, this.outbound.as_mut().expect("outbound was empty")) - } -} - -impl

Drop for OutboundSubstreamRefFuture

-where - P: Deref, - P::Target: StreamMuxer, -{ - fn drop(&mut self) { - self.muxer - .destroy_outbound(self.outbound.take().expect("outbound was empty")) - } -} +#[derive(Default, Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Hash)] +pub struct OutboundSubstreamId(u64); -/// Builds an implementation of `Read`/`Write`/`AsyncRead`/`AsyncWrite` from an `Arc` to the -/// muxer and a substream. -pub fn substream_from_ref

( - muxer: P, - substream: ::Substream, -) -> SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - SubstreamRef { - muxer, - substream: Some(substream), - shutdown_state: ShutdownState::Shutdown, - } -} +impl OutboundSubstreamId { + pub(crate) const ZERO: OutboundSubstreamId = OutboundSubstreamId(0); -/// Stream returned by `substream_from_ref`. -pub struct SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - muxer: P, - substream: Option<::Substream>, - shutdown_state: ShutdownState, -} + fn fetch_add(&mut self) -> Self { + let next_id = Self(self.0); -enum ShutdownState { - Shutdown, - Flush, - Done, -} + self.0 += 1; -impl

fmt::Debug for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, - ::Substream: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - write!(f, "Substream({:?})", self.substream) + next_id } } -impl

Unpin for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ +/// Utility state machine for muxers to simplify the generation of new [`OutboundSubstreamId`]s +/// and the state handling around when new outbound substreams should be opened. +#[derive(Default)] +pub struct OutboundSubstreams { + current_id: OutboundSubstreamId, + substreams_to_open: VecDeque, } -impl

AsyncRead for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; +impl OutboundSubstreams { + /// Instruct this state machine to open a new outbound substream. + pub fn open_new(&mut self) -> OutboundSubstreamId { + let next_id = self.current_id.fetch_add(); + self.substreams_to_open.push_back(next_id); - let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.read_substream(cx, s, buf).map_err(|e| e.into()) + next_id } -} -impl

AsyncWrite for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; - - let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.write_substream(cx, s, buf).map_err(|e| e.into()) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; - - let s = this.substream.as_mut().expect("substream was empty"); - loop { - match this.shutdown_state { - ShutdownState::Shutdown => match this.muxer.shutdown_substream(cx, s) { - Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Flush, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), - Poll::Pending => return Poll::Pending, - }, - ShutdownState::Flush => match this.muxer.flush_substream(cx, s) { - Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Done, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), - Poll::Pending => return Poll::Pending, - }, - ShutdownState::Done => { - return Poll::Ready(Ok(())); - } - } + /// Allow this state machine to make progress. + /// + /// In case a new substream was requested, we invoke the provided poll function and return the + /// created substream along with its ID. + pub fn poll( + &mut self, + poll_open: impl FnOnce() -> Poll>, + ) -> Poll> { + if self.substreams_to_open.is_empty() { + return Poll::Pending; } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // We use a `this` because the compiler isn't smart enough to allow mutably borrowing - // multiple different fields from the `Pin` at the same time. - let this = &mut *self; - let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.flush_substream(cx, s).map_err(|e| e.into()) - } -} + let stream = futures::ready!(poll_open())?; + let id = self + .substreams_to_open + .pop_front() + .expect("we checked that we are not empty"); -impl

Drop for SubstreamRef

-where - P: Deref, - P::Target: StreamMuxer, -{ - fn drop(&mut self) { - self.muxer - .destroy_substream(self.substream.take().expect("substream was empty")) + Poll::Ready(Ok((stream, id))) } } diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index 1284febe5dd..cf36237cf3e 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -1,171 +1,63 @@ -use crate::muxing::StreamMuxerEvent; +use crate::muxing::{OutboundSubstreamId, StreamMuxerEvent}; use crate::StreamMuxer; -use fnv::FnvHashMap; -use parking_lot::Mutex; -use std::io; -use std::sync::atomic::{AtomicUsize, Ordering}; +use futures::{AsyncRead, AsyncWrite, Stream}; +use std::io::{IoSlice, IoSliceMut}; +use std::pin::Pin; use std::task::{Context, Poll}; +use std::{fmt, io}; -/// Abstract `StreamMuxer`. +/// Abstract [`StreamMuxer`] with [`SubstreamBox`] as its `Substream` type. pub struct StreamMuxerBox { - inner: Box< - dyn StreamMuxer - + Send - + Sync, - >, + inner: Box + Send + 'static>, } -struct Wrap -where - T: StreamMuxer, -{ +/// Abstract type for asynchronous reading and writing. +/// +/// A [`SubstreamBox`] erases the concrete type it is given and only retains its `AsyncRead` +/// and `AsyncWrite` capabilities. +pub struct SubstreamBox(Box); + +/// Helper type to abstract away the concrete [`Substream`](StreamMuxer::Substream) type of a [`StreamMuxer`]. +/// +/// Within this implementation, we map the concrete [`Substream`](StreamMuxer::Substream) to a [`SubstreamBox`]. +/// By parameterizing the entire [`Wrap`] type, we can stuff it into a [`Box`] and erase the concrete type. +struct Wrap { inner: T, - substreams: Mutex>, - next_substream: AtomicUsize, - outbound: Mutex>, - next_outbound: AtomicUsize, } impl StreamMuxer for Wrap where T: StreamMuxer, + T::Substream: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - type Substream = usize; // TODO: use a newtype - type OutboundSubstream = usize; // TODO: use a newtype - type Error = io::Error; - - #[inline] - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let substream = match self.inner.poll_event(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) => { - return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) - } - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), - }; - - let id = self.next_substream.fetch_add(1, Ordering::Relaxed); - self.substreams.lock().insert(id, substream); - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(id))) - } - - #[inline] - fn open_outbound(&self) -> Self::OutboundSubstream { - let outbound = self.inner.open_outbound(); - let id = self.next_outbound.fetch_add(1, Ordering::Relaxed); - self.outbound.lock().insert(id, outbound); - id - } - - #[inline] - fn poll_outbound( - &self, - cx: &mut Context<'_>, - substream: &mut Self::OutboundSubstream, - ) -> Poll> { - let mut list = self.outbound.lock(); - let substream = match self - .inner - .poll_outbound(cx, list.get_mut(substream).unwrap()) - { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(s)) => s, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), - }; - let id = self.next_substream.fetch_add(1, Ordering::Relaxed); - self.substreams.lock().insert(id, substream); - Poll::Ready(Ok(id)) - } - - #[inline] - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - let mut list = self.outbound.lock(); - self.inner - .destroy_outbound(list.remove(&substream).unwrap()) - } - - #[inline] - fn read_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner - .read_substream(cx, list.get_mut(s).unwrap(), buf) - .map_err(|e| e.into()) - } - - #[inline] - fn write_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner - .write_substream(cx, list.get_mut(s).unwrap(), buf) - .map_err(|e| e.into()) - } + type Substream = SubstreamBox; - #[inline] - fn flush_substream( - &self, + fn poll( + &mut self, cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner - .flush_substream(cx, list.get_mut(s).unwrap()) - .map_err(|e| e.into()) - } + ) -> Poll>> { + let event = futures::ready!(self.inner.poll(cx))?.map_stream(SubstreamBox::new); - #[inline] - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner - .shutdown_substream(cx, list.get_mut(s).unwrap()) - .map_err(|e| e.into()) + Poll::Ready(Ok(event)) } - #[inline] - fn destroy_substream(&self, substream: Self::Substream) { - let mut list = self.substreams.lock(); - self.inner - .destroy_substream(list.remove(&substream).unwrap()) + fn open_outbound(&mut self) -> OutboundSubstreamId { + self.inner.open_outbound() } - #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx).map_err(|e| e.into()) + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx) } } impl StreamMuxerBox { - /// Turns a stream muxer into a `StreamMuxerBox`. + /// Construct a new [`StreamMuxerBox`]. pub fn new(muxer: T) -> StreamMuxerBox where - T: StreamMuxer + Send + Sync + 'static, - T::OutboundSubstream: Send, - T::Substream: Send, + T: StreamMuxer + Send + 'static, + T::Substream: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - let wrap = Wrap { - inner: muxer, - substreams: Mutex::new(Default::default()), - next_substream: AtomicUsize::new(0), - outbound: Mutex::new(Default::default()), - next_outbound: AtomicUsize::new(0), - }; + let wrap = Wrap { inner: muxer }; StreamMuxerBox { inner: Box::new(wrap), @@ -174,82 +66,102 @@ impl StreamMuxerBox { } impl StreamMuxer for StreamMuxerBox { - type Substream = usize; // TODO: use a newtype - type OutboundSubstream = usize; // TODO: use a newtype - type Error = io::Error; + type Substream = SubstreamBox; - #[inline] - fn poll_event( - &self, + fn poll( + &mut self, cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - self.inner.poll_event(cx) + ) -> Poll>> { + self.inner.poll(cx) } - #[inline] - fn open_outbound(&self) -> Self::OutboundSubstream { + fn open_outbound(&mut self) -> OutboundSubstreamId { self.inner.open_outbound() } - #[inline] - fn poll_outbound( - &self, - cx: &mut Context<'_>, - s: &mut Self::OutboundSubstream, - ) -> Poll> { - self.inner.poll_outbound(cx, s) + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx) } +} - #[inline] - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - self.inner.destroy_outbound(substream) +impl Stream for StreamMuxerBox { + type Item = io::Result::Substream>>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().poll(cx).map(Some) + } +} + +impl SubstreamBox { + /// Construct a new [`SubstreamBox`] from something that implements [`AsyncRead`] and [`AsyncWrite`]. + pub fn new(stream: S) -> Self { + Self(Box::new(stream)) + } +} + +impl fmt::Debug for SubstreamBox { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SubstreamBox({})", self.0.type_name()) } +} + +/// Workaround because Rust does not allow `Box`. +trait AsyncReadWrite: AsyncRead + AsyncWrite + Unpin { + /// Helper function to capture the erased inner type. + /// + /// Used to make the `Debug` implementation of `SubstreamBox` more useful. + fn type_name(&self) -> &'static str; +} - #[inline] - fn read_substream( - &self, +impl AsyncReadWrite for S +where + S: AsyncRead + AsyncWrite + Unpin, +{ + fn type_name(&self) -> &'static str { + std::any::type_name::() + } +} + +impl AsyncRead for SubstreamBox { + fn poll_read( + self: Pin<&mut Self>, cx: &mut Context<'_>, - s: &mut Self::Substream, buf: &mut [u8], - ) -> Poll> { - self.inner.read_substream(cx, s, buf) + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_read(cx, buf) } - #[inline] - fn write_substream( - &self, + fn poll_read_vectored( + self: Pin<&mut Self>, cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - self.inner.write_substream(cx, s, buf) + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_read_vectored(cx, bufs) } +} - #[inline] - fn flush_substream( - &self, +impl AsyncWrite for SubstreamBox { + fn poll_write( + self: Pin<&mut Self>, cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - self.inner.flush_substream(cx, s) + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_write(cx, buf) } - #[inline] - fn shutdown_substream( - &self, + fn poll_write_vectored( + self: Pin<&mut Self>, cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - self.inner.shutdown_substream(cx, s) + bufs: &[IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_write_vectored(cx, bufs) } - #[inline] - fn destroy_substream(&self, s: Self::Substream) { - self.inner.destroy_substream(s) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_flush(cx) } - #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_close(cx) } } diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index 529fa147f9c..522f6042f46 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -18,20 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::muxing::OutboundSubstreamId; use crate::{ connection::Endpoint, muxing::{StreamMuxer, StreamMuxerEvent}, }; use futures::prelude::*; -use parking_lot::Mutex; -use std::{ - io, - pin::Pin, - sync::atomic::{AtomicBool, Ordering}, - task::Context, - task::Poll, -}; +use std::{io, task::Context, task::Poll}; /// Implementation of `StreamMuxer` that allows only one substream on top of a connection, /// yielding the connection itself. @@ -40,9 +34,7 @@ use std::{ /// Most notably, no protocol is negotiated. pub struct SingletonMuxer { /// The inner connection. - inner: Mutex, - /// If true, a substream has been produced and any further attempt should fail. - substream_extracted: AtomicBool, + inner: Option, /// Our local endpoint. Always the same value as was passed to `new`. endpoint: Endpoint, } @@ -54,102 +46,39 @@ impl SingletonMuxer { /// If `endpoint` is `Listener`, then only one inbound substream will be permitted. pub fn new(inner: TSocket, endpoint: Endpoint) -> Self { SingletonMuxer { - inner: Mutex::new(inner), - substream_extracted: AtomicBool::new(false), + inner: Some(inner), endpoint, } } } -/// Substream of the `SingletonMuxer`. -pub struct Substream {} -/// Outbound substream attempt of the `SingletonMuxer`. -pub struct OutboundSubstream {} - impl StreamMuxer for SingletonMuxer where - TSocket: AsyncRead + AsyncWrite + Unpin, + TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - type Substream = Substream; - type OutboundSubstream = OutboundSubstream; - type Error = io::Error; - - fn poll_event( - &self, - _: &mut Context<'_>, - ) -> Poll, io::Error>> { - match self.endpoint { - Endpoint::Dialer => return Poll::Pending, - Endpoint::Listener => {} - } - - if !self.substream_extracted.swap(true, Ordering::Relaxed) { - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(Substream {}))) - } else { - Poll::Pending - } + type Substream = TSocket; + + fn poll(&mut self, _: &mut Context<'_>) -> Poll>> { + let stream = match self.inner.take() { + None => return Poll::Pending, + Some(stream) => stream, + }; + + let event = match self.endpoint { + Endpoint::Dialer => { + StreamMuxerEvent::OutboundSubstream(stream, OutboundSubstreamId::ZERO) + } + Endpoint::Listener => StreamMuxerEvent::InboundSubstream(stream), + }; + + Poll::Ready(Ok(event)) } - fn open_outbound(&self) -> Self::OutboundSubstream { - OutboundSubstream {} - } - - fn poll_outbound( - &self, - _: &mut Context<'_>, - _: &mut Self::OutboundSubstream, - ) -> Poll> { - match self.endpoint { - Endpoint::Listener => return Poll::Pending, - Endpoint::Dialer => {} - } - - if !self.substream_extracted.swap(true, Ordering::Relaxed) { - Poll::Ready(Ok(Substream {})) - } else { - Poll::Pending - } + fn open_outbound(&mut self) -> OutboundSubstreamId { + OutboundSubstreamId::ZERO } - fn destroy_outbound(&self, _: Self::OutboundSubstream) {} - - fn read_substream( - &self, - cx: &mut Context<'_>, - _: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll> { - AsyncRead::poll_read(Pin::new(&mut *self.inner.lock()), cx, buf) - } - - fn write_substream( - &self, - cx: &mut Context<'_>, - _: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - AsyncWrite::poll_write(Pin::new(&mut *self.inner.lock()), cx, buf) - } - - fn flush_substream( - &self, - cx: &mut Context<'_>, - _: &mut Self::Substream, - ) -> Poll> { - AsyncWrite::poll_flush(Pin::new(&mut *self.inner.lock()), cx) - } - - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - _: &mut Self::Substream, - ) -> Poll> { - AsyncWrite::poll_close(Pin::new(&mut *self.inner.lock()), cx) - } - - fn destroy_substream(&self, _: Self::Substream) {} - - fn poll_close(&self, _cx: &mut Context<'_>) -> Poll> { + fn poll_close(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 5508429754f..6cd1189ed1d 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -22,9 +22,10 @@ pub use crate::upgrade::Version; +use crate::muxing::StreamMuxerBox; use crate::{ connection::ConnectedPoint, - muxing::{StreamMuxer, StreamMuxerBox}, + muxing::StreamMuxer, transport::{ and_then::AndThen, boxed::boxed, timeout::TransportTimeout, ListenerEvent, Transport, TransportError, @@ -299,9 +300,8 @@ impl Multiplexed { T::Listener: Send + 'static, T::ListenerUpgrade: Send + 'static, T::Error: Send + Sync, - M: StreamMuxer + Send + Sync + 'static, - M::Substream: Send + 'static, - M::OutboundSubstream: Send + 'static, + M: StreamMuxer + Send + 'static, + M::Substream: AsyncRead + AsyncWrite + Send + Unpin + 'static, { boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m)))) } diff --git a/core/tests/util.rs b/core/tests/util.rs index 7ca52188a52..1bb33f63bd4 100644 --- a/core/tests/util.rs +++ b/core/tests/util.rs @@ -2,7 +2,7 @@ use futures::prelude::*; use libp2p_core::muxing::StreamMuxer; -use std::{pin::Pin, task::Context, task::Poll}; +use std::{io, pin::Pin, task::Context, task::Poll}; pub struct CloseMuxer { state: CloseMuxerState, @@ -24,14 +24,13 @@ pub enum CloseMuxerState { impl Future for CloseMuxer where M: StreamMuxer, - M::Error: From, { - type Output = Result; + type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match std::mem::replace(&mut self.state, CloseMuxerState::Done) { - CloseMuxerState::Close(muxer) => { + CloseMuxerState::Close(mut muxer) => { if !muxer.poll_close(cx)?.is_ready() { self.state = CloseMuxerState::Close(muxer); return Poll::Pending; diff --git a/muxers/mplex/benches/split_send_size.rs b/muxers/mplex/benches/split_send_size.rs index ff52177529d..692d8d75cfa 100644 --- a/muxers/mplex/benches/split_send_size.rs +++ b/muxers/mplex/benches/split_send_size.rs @@ -103,8 +103,8 @@ fn run(transport: &mut BenchTransport, payload: &Vec, listen_addr: &Multiadd addr_sender.take().unwrap().send(a).unwrap(); } transport::ListenerEvent::Upgrade { upgrade, .. } => { - let (_peer, conn) = upgrade.await.unwrap(); - let mut s = poll_fn(|cx| conn.poll_event(cx)) + let (_peer, mut conn) = upgrade.await.unwrap(); + let mut s = poll_fn(|cx| conn.poll(cx)) .await .expect("unexpected error") .into_inbound_substream() @@ -115,9 +115,7 @@ fn run(transport: &mut BenchTransport, payload: &Vec, listen_addr: &Multiadd loop { // Read in typical chunk sizes of up to 8KiB. let end = off + std::cmp::min(buf.len() - off, 8 * 1024); - let n = poll_fn(|cx| conn.read_substream(cx, &mut s, &mut buf[off..end])) - .await - .unwrap(); + let n = s.read(&mut buf[off..end]).await.unwrap(); off += n; if off == buf.len() { return; @@ -132,24 +130,16 @@ fn run(transport: &mut BenchTransport, payload: &Vec, listen_addr: &Multiadd // Spawn and block on the sender, i.e. until all data is sent. task::block_on(async move { let addr = addr_receiver.await.unwrap(); - let (_peer, conn) = transport.dial(addr).unwrap().await.unwrap(); - let mut handle = conn.open_outbound(); - let mut stream = poll_fn(|cx| conn.poll_outbound(cx, &mut handle)) + let (_peer, mut conn) = transport.dial(addr).unwrap().await.unwrap(); + conn.open_outbound(); + let (mut stream, _) = poll_fn(|cx| conn.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); - let mut off = 0; - loop { - let n = poll_fn(|cx| conn.write_substream(cx, &mut stream, &payload[off..])) - .await - .unwrap(); - off += n; - if off == payload.len() { - poll_fn(|cx| conn.flush_substream(cx, &mut stream)) - .await - .unwrap(); - return; - } - } + + stream.write_all(&payload).await.unwrap(); + stream.flush().await.unwrap(); }); // Wait for all data to be received. diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index f2a95c21151..50d9c812ce0 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -28,12 +28,14 @@ use bytes::Bytes; use codec::LocalStreamId; use futures::{future, prelude::*, ready}; use libp2p_core::{ + muxing::OutboundSubstreamId, + muxing::OutboundSubstreams, muxing::StreamMuxerEvent, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, StreamMuxer, }; use parking_lot::Mutex; -use std::{cmp, iter, task::Context, task::Poll}; +use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll}; impl UpgradeInfo for MplexConfig { type Info = &'static [u8]; @@ -54,7 +56,8 @@ where fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { future::ready(Ok(Multiplex { - io: Mutex::new(io::Multiplexed::new(socket, self)), + io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))), + outbound_substreams: OutboundSubstreams::default(), })) } } @@ -69,7 +72,8 @@ where fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { future::ready(Ok(Multiplex { - io: Mutex::new(io::Multiplexed::new(socket, self)), + io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))), + outbound_substreams: OutboundSubstreams::default(), })) } } @@ -79,117 +83,142 @@ where /// This implementation isn't capable of detecting when the underlying socket changes its address, /// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted. pub struct Multiplex { - io: Mutex>, + io: Arc>>, + outbound_substreams: OutboundSubstreams, } impl StreamMuxer for Multiplex where C: AsyncRead + AsyncWrite + Unpin, { - type Substream = Substream; - type OutboundSubstream = OutboundSubstream; - type Error = io::Error; + type Substream = Substream; - fn poll_event( - &self, + fn poll( + &mut self, cx: &mut Context<'_>, ) -> Poll>> { - let stream_id = ready!(self.io.lock().poll_next_stream(cx))?; - let stream = Substream::new(stream_id); - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream))) - } + if let Poll::Ready((mplex_id, outbound_id)) = self + .outbound_substreams + .poll(|| Pin::new(&mut self.io.lock()).poll_open_stream(cx))? + { + let substream = Substream::new(mplex_id, self.io.clone()); + + return Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream( + substream, + outbound_id, + ))); + } + + if let Poll::Ready(mplex_id) = self.io.lock().poll_next_stream(cx)? { + let substream = Substream::new(mplex_id, self.io.clone()); + + return Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))); + } - fn open_outbound(&self) -> Self::OutboundSubstream { - OutboundSubstream {} + Poll::Pending } - fn poll_outbound( - &self, - cx: &mut Context<'_>, - _: &mut Self::OutboundSubstream, - ) -> Poll> { - let stream_id = ready!(self.io.lock().poll_open_stream(cx))?; - Poll::Ready(Ok(Substream::new(stream_id))) + fn open_outbound(&mut self) -> OutboundSubstreamId { + self.outbound_substreams.open_new() } - fn destroy_outbound(&self, _substream: Self::OutboundSubstream) { - // Nothing to do, since `open_outbound` creates no new local state. + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + self.io.lock().poll_close(cx) } +} - fn read_substream( - &self, +impl AsyncRead for Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, cx: &mut Context<'_>, - substream: &mut Self::Substream, buf: &mut [u8], - ) -> Poll> { + ) -> Poll> { + let this = self.get_mut(); + loop { // Try to read from the current (i.e. last received) frame. - if !substream.current_data.is_empty() { - let len = cmp::min(substream.current_data.len(), buf.len()); - buf[..len].copy_from_slice(&substream.current_data.split_to(len)); + if !this.current_data.is_empty() { + let len = cmp::min(this.current_data.len(), buf.len()); + buf[..len].copy_from_slice(&this.current_data.split_to(len)); return Poll::Ready(Ok(len)); } // Read the next data frame from the multiplexed stream. - match ready!(self.io.lock().poll_read_stream(cx, substream.id))? { + match ready!(this.io.lock().poll_read_stream(cx, this.id))? { Some(data) => { - substream.current_data = data; + this.current_data = data; } None => return Poll::Ready(Ok(0)), } } } +} - fn write_substream( - &self, +impl AsyncWrite for Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ + fn poll_write( + self: Pin<&mut Self>, cx: &mut Context<'_>, - substream: &mut Self::Substream, buf: &[u8], - ) -> Poll> { - self.io.lock().poll_write_stream(cx, substream.id, buf) - } + ) -> Poll> { + let this = self.get_mut(); - fn flush_substream( - &self, - cx: &mut Context<'_>, - substream: &mut Self::Substream, - ) -> Poll> { - self.io.lock().poll_flush_stream(cx, substream.id) + this.io.lock().poll_write_stream(cx, this.id, buf) } - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - substream: &mut Self::Substream, - ) -> Poll> { - self.io.lock().poll_close_stream(cx, substream.id) - } + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); - fn destroy_substream(&self, sub: Self::Substream) { - self.io.lock().drop_stream(sub.id); + this.io.lock().poll_flush_stream(cx, this.id) } - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.io.lock().poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let mut io = this.io.lock(); + + ready!(io.poll_close_stream(cx, this.id))?; + ready!(io.poll_flush_stream(cx, this.id))?; + + Poll::Ready(Ok(())) } } -/// Active attempt to open an outbound substream. -pub struct OutboundSubstream {} - /// Active substream to the remote. -pub struct Substream { +pub struct Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ /// The unique, local identifier of the substream. id: LocalStreamId, /// The current data frame the substream is reading from. current_data: Bytes, + /// Shared reference to the actual muxer. + io: Arc>>, } -impl Substream { - fn new(id: LocalStreamId) -> Self { +impl Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ + fn new(id: LocalStreamId, io: Arc>>) -> Self { Self { id, current_data: Bytes::new(), + io, } } } + +impl Drop for Substream +where + C: AsyncRead + AsyncWrite + Unpin, +{ + fn drop(&mut self) { + self.io.lock().drop_stream(self.id); + } +} diff --git a/muxers/mplex/tests/async_write.rs b/muxers/mplex/tests/async_write.rs index 96b608a68ad..d359d99947d 100644 --- a/muxers/mplex/tests/async_write.rs +++ b/muxers/mplex/tests/async_write.rs @@ -18,10 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::{channel::oneshot, prelude::*}; -use libp2p_core::{muxing, upgrade, Transport}; +use futures::{channel::oneshot, future::poll_fn, prelude::*}; +use libp2p_core::{upgrade, StreamMuxer, Transport}; use libp2p_tcp::TcpConfig; -use std::sync::Arc; #[test] fn async_write() { @@ -49,7 +48,7 @@ fn async_write() { tx.send(addr).unwrap(); - let client = listener + let mut client = listener .next() .await .unwrap() @@ -60,8 +59,11 @@ fn async_write() { .await .unwrap(); - let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)) + client.open_outbound(); + let (mut outbound, _) = poll_fn(|cx| client.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); let mut buf = Vec::new(); @@ -74,9 +76,9 @@ fn async_write() { let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); - let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); + let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); let mut inbound = loop { - if let Some(s) = muxing::event_from_ref_and_wrap(client.clone()) + if let Some(s) = poll_fn(|cx| client.poll(cx)) .await .unwrap() .into_inbound_substream() diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index 77e1a09997b..b30e23d6403 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -18,10 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::{channel::oneshot, prelude::*}; -use libp2p_core::{muxing, upgrade, Transport}; +use futures::{channel::oneshot, future::poll_fn, prelude::*}; +use libp2p_core::{upgrade, StreamMuxer, Transport}; use libp2p_tcp::TcpConfig; -use std::sync::Arc; #[test] fn client_to_server_outbound() { @@ -49,7 +48,7 @@ fn client_to_server_outbound() { tx.send(addr).unwrap(); - let client = listener + let mut client = listener .next() .await .unwrap() @@ -60,12 +59,15 @@ fn client_to_server_outbound() { .await .unwrap(); - let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)) + client.open_outbound(); + let (mut outbound, _) = poll_fn(|cx| client.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); - let mut buf = Vec::new(); - outbound.read_to_end(&mut buf).await.unwrap(); + let mut buf = vec![0u8; 11]; + outbound.read_exact(&mut buf).await.unwrap(); assert_eq!(buf, b"hello world"); }); @@ -74,9 +76,9 @@ fn client_to_server_outbound() { let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); - let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); + let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); let mut inbound = loop { - if let Some(s) = muxing::event_from_ref_and_wrap(client.clone()) + if let Some(s) = poll_fn(|cx| client.poll(cx)) .await .unwrap() .into_inbound_substream() @@ -117,21 +119,19 @@ fn client_to_server_inbound() { tx.send(addr).unwrap(); - let client = Arc::new( - listener - .next() - .await - .unwrap() - .unwrap() - .into_upgrade() - .unwrap() - .0 - .await - .unwrap(), - ); + let mut client = listener + .next() + .await + .unwrap() + .unwrap() + .into_upgrade() + .unwrap() + .0 + .await + .unwrap(); let mut inbound = loop { - if let Some(s) = muxing::event_from_ref_and_wrap(client.clone()) + if let Some(s) = poll_fn(|cx| client.poll(cx)) .await .unwrap() .into_inbound_substream() @@ -150,9 +150,13 @@ fn client_to_server_inbound() { let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); - let client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); - let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)) + let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); + + client.open_outbound(); + let (mut outbound, _) = poll_fn(|cx| client.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); outbound.write_all(b"hello world").await.unwrap(); outbound.close().await.unwrap(); @@ -185,7 +189,7 @@ fn protocol_not_match() { tx.send(addr).unwrap(); - let client = listener + let mut client = listener .next() .await .unwrap() @@ -196,8 +200,11 @@ fn protocol_not_match() { .await .unwrap(); - let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)) + client.open_outbound(); + let (mut outbound, _) = poll_fn(|cx| client.poll(cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); let mut buf = Vec::new(); diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index de739b1b965..62047deebc9 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -1,6 +1,9 @@ # 0.37.0 - Update to `libp2p-core` `v0.33.0`. +- Remove `YamuxResult` and `YamuxError` from the public API. See [PR 2648]. + +[PR 2648]: https://github.com/libp2p/rust-libp2p/pull/2648 # 0.36.0 [2022-02-22] diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 91bd27376f7..83fac210758 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -24,21 +24,25 @@ use futures::{ future, prelude::*, - ready, stream::{BoxStream, LocalBoxStream}, }; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; +use libp2p_core::muxing::{OutboundSubstreamId, OutboundSubstreams, StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use parking_lot::Mutex; use std::{ fmt, io, iter, mem, pin::Pin, task::{Context, Poll}, }; -use thiserror::Error; /// A Yamux connection. -pub struct Yamux(Mutex>); +pub struct Yamux { + /// The [`futures::stream::Stream`] of incoming substreams. + incoming: S, + /// Handle to control the connection. + control: yamux::Control, + /// Controls the state handling around opening new outbound substreams. + outbound_substreams: OutboundSubstreams, +} impl fmt::Debug for Yamux { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -46,13 +50,6 @@ impl fmt::Debug for Yamux { } } -struct Inner { - /// The [`futures::stream::Stream`] of incoming substreams. - incoming: S, - /// Handle to control the connection. - control: yamux::Control, -} - /// A token to poll for an outbound substream. #[derive(Debug)] pub struct OpenSubstreamToken(()); @@ -65,14 +62,15 @@ where fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { let conn = yamux::Connection::new(io, cfg, mode); let ctrl = conn.control(); - let inner = Inner { + + Yamux { incoming: Incoming { stream: yamux::into_stream(conn).err_into().boxed(), _marker: std::marker::PhantomData, }, control: ctrl, - }; - Yamux(Mutex::new(inner)) + outbound_substreams: OutboundSubstreams::default(), + } } } @@ -84,110 +82,62 @@ where fn local(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { let conn = yamux::Connection::new(io, cfg, mode); let ctrl = conn.control(); - let inner = Inner { + + Yamux { incoming: LocalIncoming { stream: yamux::into_stream(conn).err_into().boxed_local(), _marker: std::marker::PhantomData, }, control: ctrl, - }; - Yamux(Mutex::new(inner)) + outbound_substreams: OutboundSubstreams::default(), + } } } -pub type YamuxResult = Result; - /// > **Note**: This implementation never emits [`StreamMuxerEvent::AddressChange`] events. impl StreamMuxer for Yamux where - S: Stream> + Unpin, + S: Stream> + Unpin, { type Substream = yamux::Stream; - type OutboundSubstream = OpenSubstreamToken; - type Error = YamuxError; - - fn poll_event( - &self, - c: &mut Context<'_>, - ) -> Poll>> { - let mut inner = self.0.lock(); - match ready!(inner.incoming.poll_next_unpin(c)) { - Some(Ok(s)) => Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))), - Some(Err(e)) => Poll::Ready(Err(e)), - None => Poll::Ready(Err(yamux::ConnectionError::Closed.into())), - } - } - - fn open_outbound(&self) -> Self::OutboundSubstream { - OpenSubstreamToken(()) - } - - fn poll_outbound( - &self, - c: &mut Context<'_>, - _: &mut OpenSubstreamToken, - ) -> Poll> { - let mut inner = self.0.lock(); - Pin::new(&mut inner.control) - .poll_open_stream(c) - .map_err(YamuxError) - } - fn destroy_outbound(&self, _: Self::OutboundSubstream) { - self.0.lock().control.abort_open_stream() - } + fn poll(&mut self, c: &mut Context<'_>) -> Poll>> { + if let Poll::Ready((stream, id)) = self + .outbound_substreams + .poll(|| Pin::new(&mut self.control).poll_open_stream(c)) + .map_err(to_io_error)? + { + return Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(stream, id))); + } - fn read_substream( - &self, - c: &mut Context<'_>, - s: &mut Self::Substream, - b: &mut [u8], - ) -> Poll> { - Pin::new(s) - .poll_read(c, b) - .map_err(|e| YamuxError(e.into())) - } + if let Poll::Ready(next) = self.incoming.poll_next_unpin(c) { + let event = next + .transpose() + .map_err(to_io_error)? + .map(StreamMuxerEvent::InboundSubstream) + .ok_or_else(|| to_io_error(yamux::ConnectionError::Closed))?; - fn write_substream( - &self, - c: &mut Context<'_>, - s: &mut Self::Substream, - b: &[u8], - ) -> Poll> { - Pin::new(s) - .poll_write(c, b) - .map_err(|e| YamuxError(e.into())) - } + return Poll::Ready(Ok(event)); + } - fn flush_substream( - &self, - c: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - Pin::new(s).poll_flush(c).map_err(|e| YamuxError(e.into())) + Poll::Pending } - fn shutdown_substream( - &self, - c: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - Pin::new(s).poll_close(c).map_err(|e| YamuxError(e.into())) + fn open_outbound(&mut self) -> OutboundSubstreamId { + self.outbound_substreams.open_new() } - fn destroy_substream(&self, _: Self::Substream) {} - - fn poll_close(&self, c: &mut Context<'_>) -> Poll> { - let mut inner = self.0.lock(); - - if let Poll::Ready(()) = Pin::new(&mut inner.control) + fn poll_close(&mut self, c: &mut Context<'_>) -> Poll> { + if let Poll::Ready(()) = Pin::new(&mut self.control) .poll_close(c) - .map_err(YamuxError)? + .map_err(to_io_error)? { return Poll::Ready(Ok(())); } - while let Poll::Ready(maybe_inbound_stream) = inner.incoming.poll_next_unpin(c)? { + while let Poll::Ready(maybe_inbound_stream) = + self.incoming.poll_next_unpin(c).map_err(to_io_error)? + { match maybe_inbound_stream { Some(inbound_stream) => mem::drop(inbound_stream), None => return Poll::Ready(Ok(())), @@ -386,23 +336,16 @@ where } } -/// The Yamux [`StreamMuxer`] error type. -#[derive(Debug, Error)] -#[error("yamux error: {0}")] -pub struct YamuxError(#[from] yamux::ConnectionError); - -impl From for io::Error { - fn from(err: YamuxError) -> Self { - match err.0 { - yamux::ConnectionError::Io(e) => e, - e => io::Error::new(io::ErrorKind::Other, e), - } +fn to_io_error(e: yamux::ConnectionError) -> io::Error { + match e { + yamux::ConnectionError::Io(e) => e, + e => io::Error::new(io::ErrorKind::Other, e), } } /// The [`futures::stream::Stream`] of incoming substreams. pub struct Incoming { - stream: BoxStream<'static, Result>, + stream: BoxStream<'static, Result>, _marker: std::marker::PhantomData, } @@ -414,7 +357,7 @@ impl fmt::Debug for Incoming { /// The [`futures::stream::Stream`] of incoming substreams (`!Send`). pub struct LocalIncoming { - stream: LocalBoxStream<'static, Result>, + stream: LocalBoxStream<'static, Result>, _marker: std::marker::PhantomData, } @@ -425,7 +368,7 @@ impl fmt::Debug for LocalIncoming { } impl Stream for Incoming { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.stream.as_mut().poll_next_unpin(cx) @@ -439,7 +382,7 @@ impl Stream for Incoming { impl Unpin for Incoming {} impl Stream for LocalIncoming { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.stream.as_mut().poll_next_unpin(cx) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index f55f0367b8c..90eb372f370 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -21,7 +21,6 @@ mod error; mod handler_wrapper; mod listeners; -mod substream; pub(crate) mod pool; @@ -32,17 +31,18 @@ pub use error::{ pub use listeners::{ListenersEvent, ListenersStream}; pub use pool::{ConnectionCounters, ConnectionLimits}; pub use pool::{EstablishedConnection, PendingConnection}; -pub use substream::{Close, Substream, SubstreamEndpoint}; use crate::handler::ConnectionHandler; +use futures::future::poll_fn; use handler_wrapper::HandlerWrapper; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::StreamMuxerBox; -use libp2p_core::upgrade; +use libp2p_core::muxing::{OutboundSubstreamId, StreamMuxerBox, StreamMuxerEvent}; use libp2p_core::PeerId; -use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll}; -use substream::{Muxing, SubstreamEvent}; +use libp2p_core::{upgrade, StreamMuxer}; +use std::collections::HashMap; +use std::future::Future; +use std::{error::Error, fmt, io, pin::Pin, task::Context, task::Poll}; /// Information about a successfully established connection. #[derive(Debug, Clone, PartialEq, Eq)] @@ -68,7 +68,11 @@ where THandler: ConnectionHandler, { /// Node that handles the muxing. - muxing: substream::Muxing>, + muxing: StreamMuxerBox, + + /// Temporary storage of OutboundOpenInfo whilst the substreams are opening. + open_info: HashMap>, + /// Handler that processes substreams. handler: HandlerWrapper, } @@ -76,10 +80,11 @@ where impl fmt::Debug for Connection where THandler: ConnectionHandler + fmt::Debug, + handler_wrapper::OutboundOpenInfo: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection") - .field("muxing", &self.muxing) + .field("open_info", &self.open_info) .field("handler", &self.handler) .finish() } @@ -105,7 +110,8 @@ where max_negotiating_inbound_streams, ); Connection { - muxing: Muxing::new(muxer), + muxing: muxer, + open_info: HashMap::default(), handler: wrapped_handler, } } @@ -117,10 +123,10 @@ where /// Begins an orderly shutdown of the connection, returning the connection /// handler and a `Future` that resolves when connection shutdown is complete. - pub fn close(self) -> (THandler, Close) { + pub fn close(mut self) -> (THandler, impl Future>) { ( self.handler.into_connection_handler(), - self.muxing.close().0, + poll_fn(move |cx| self.muxing.poll_close(cx)), ) } @@ -135,7 +141,8 @@ where match self.handler.poll(cx) { Poll::Pending => {} Poll::Ready(Ok(handler_wrapper::Event::OutboundSubstreamRequest(user_data))) => { - self.muxing.open_substream(user_data); + let id = self.muxing.open_outbound(); + self.open_info.insert(id, user_data); continue; } Poll::Ready(Ok(handler_wrapper::Event::Custom(event))) => { @@ -148,20 +155,19 @@ where // of new substreams. match self.muxing.poll(cx) { Poll::Pending => {} - Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => { + Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => { self.handler .inject_substream(substream, SubstreamEndpoint::Listener); continue; } - Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { - user_data, - substream, - })) => { + Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(substream, id))) => { + let user_data = self.open_info.remove(&id).expect("unknown substream id"); let endpoint = SubstreamEndpoint::Dialer(user_data); + self.handler.inject_substream(substream, endpoint); continue; } - Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => { + Poll::Ready(Ok(StreamMuxerEvent::AddressChange(address))) => { self.handler.inject_address_change(&address); return Poll::Ready(Ok(Event::AddressChange(address))); } @@ -173,6 +179,13 @@ where } } +/// Endpoint for a received substream. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum SubstreamEndpoint { + Dialer(TDialInfo), + Listener, +} + /// Borrowed information about an incoming connection currently being negotiated. #[derive(Debug, Copy, Clone)] pub struct IncomingInfo<'a> { diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index 05e389d6134..e09efb83082 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::connection::{Substream, SubstreamEndpoint}; +use crate::connection::SubstreamEndpoint; use crate::handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, }; @@ -29,7 +29,7 @@ use futures::stream::FuturesUnordered; use futures_timer::Delay; use instant::Instant; use libp2p_core::{ - muxing::StreamMuxerBox, + muxing::SubstreamBox, upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply, UpgradeError}, Multiaddr, }; @@ -52,20 +52,14 @@ where negotiating_in: FuturesUnordered< SubstreamUpgrade< TConnectionHandler::InboundOpenInfo, - InboundUpgradeApply< - Substream, - SendWrapper, - >, + InboundUpgradeApply>, >, >, /// Futures that upgrade outgoing substreams. negotiating_out: FuturesUnordered< SubstreamUpgrade< TConnectionHandler::OutboundOpenInfo, - OutboundUpgradeApply< - Substream, - SendWrapper, - >, + OutboundUpgradeApply>, >, >, /// For each outbound substream request, how to upgrade it. The first element of the tuple @@ -84,8 +78,8 @@ where /// Note: This only enforces a limit on the number of concurrently /// negotiating inbound streams. The total number of inbound streams on a /// connection is the sum of negotiating and negotiated streams. A limit on - /// the total number of streams can be enforced at the [`StreamMuxerBox`] - /// level. + /// the total number of streams can be enforced at the + /// [`StreamMuxerBox`](libp2p_core::muxing::StreamMuxerBox) level. max_negotiating_inbound_streams: usize, } @@ -248,7 +242,7 @@ where { pub fn inject_substream( &mut self, - substream: Substream, + substream: SubstreamBox, // The first element of the tuple is the unique upgrade identifier // (see `unique_dial_upgrade_id`). endpoint: SubstreamEndpoint>, diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index a8f28beeadc..740bbbf9632 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -38,7 +38,8 @@ use futures::{ stream::FuturesUnordered, }; use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint}; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox}; +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::StreamMuxer; use std::{ collections::{hash_map, HashMap}, convert::TryFrom as _, @@ -604,7 +605,7 @@ where match event { task::PendingConnectionEvent::ConnectionEstablished { id, - output: (obtained_peer_id, muxer), + output: (obtained_peer_id, mut muxer), outgoing, } => { let PendingConnectionInfo { diff --git a/swarm/src/connection/substream.rs b/swarm/src/connection/substream.rs deleted file mode 100644 index ba83a67e46a..00000000000 --- a/swarm/src/connection/substream.rs +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use futures::prelude::*; -use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::{substream_from_ref, StreamMuxer, StreamMuxerEvent, SubstreamRef}; -use smallvec::SmallVec; -use std::sync::Arc; -use std::{fmt, io::Error as IoError, pin::Pin, task::Context, task::Poll}; - -/// Endpoint for a received substream. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum SubstreamEndpoint { - Dialer(TDialInfo), - Listener, -} - -/// Implementation of `Stream` that handles substream multiplexing. -/// -/// The stream will receive substreams and can be used to open new outgoing substreams. Destroying -/// the `Muxing` will **not** close the existing substreams. -/// -/// The stream will close once both the inbound and outbound channels are closed, and no more -/// outbound substream attempt is pending. -pub struct Muxing -where - TMuxer: StreamMuxer, -{ - /// The muxer used to manage substreams. - inner: Arc, - /// List of substreams we are currently opening. - outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>, -} - -/// Future that signals the remote that we have closed the connection. -pub struct Close { - /// Muxer to close. - muxer: Arc, -} - -/// A successfully opened substream. -pub type Substream = SubstreamRef>; - -/// Event that can happen on the `Muxing`. -pub enum SubstreamEvent -where - TMuxer: StreamMuxer, -{ - /// A new inbound substream arrived. - InboundSubstream { - /// The newly-opened substream. Will return EOF of an error if the `Muxing` is - /// destroyed or `close_graceful` is called. - substream: Substream, - }, - - /// An outbound substream has successfully been opened. - OutboundSubstream { - /// User data that has been passed to the `open_substream` method. - user_data: TUserData, - /// The newly-opened substream. Will return EOF of an error if the `Muxing` is - /// destroyed or `close_graceful` is called. - substream: Substream, - }, - - /// Address to the remote has changed. The previous one is now obsolete. - /// - /// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes - /// > can change their IP address while retaining the same QUIC connection. - AddressChange(Multiaddr), -} - -/// Identifier for a substream being opened. -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct OutboundSubstreamId(usize); - -impl Muxing -where - TMuxer: StreamMuxer, -{ - /// Creates a new node events stream. - pub fn new(muxer: TMuxer) -> Self { - Muxing { - inner: Arc::new(muxer), - outbound_substreams: SmallVec::new(), - } - } - - /// Starts the process of opening a new outbound substream. - /// - /// After calling this method, polling the stream should eventually produce either an - /// `OutboundSubstream` event or an `OutboundClosed` event containing the user data that has - /// been passed to this method. - pub fn open_substream(&mut self, user_data: TUserData) { - let raw = self.inner.open_outbound(); - self.outbound_substreams.push((user_data, raw)); - } - - /// Destroys the node stream and returns all the pending outbound substreams, plus an object - /// that signals the remote that we shut down the connection. - #[must_use] - pub fn close(mut self) -> (Close, Vec) { - let substreams = self.cancel_outgoing(); - let close = Close { - muxer: self.inner.clone(), - }; - (close, substreams) - } - - /// Destroys all outbound streams and returns the corresponding user data. - pub fn cancel_outgoing(&mut self) -> Vec { - let mut out = Vec::with_capacity(self.outbound_substreams.len()); - for (user_data, outbound) in self.outbound_substreams.drain(..) { - out.push(user_data); - self.inner.destroy_outbound(outbound); - } - out - } - - /// Provides an API similar to `Future`. - pub fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll, IoError>> { - // Polling inbound substream. - match self.inner.poll_event(cx) { - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => { - let substream = substream_from_ref(self.inner.clone(), substream); - return Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })); - } - Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => { - return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr))) - } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), - Poll::Pending => {} - } - - // Polling outbound substreams. - // We remove each element from `outbound_substreams` one by one and add them back. - for n in (0..self.outbound_substreams.len()).rev() { - let (user_data, mut outbound) = self.outbound_substreams.swap_remove(n); - match self.inner.poll_outbound(cx, &mut outbound) { - Poll::Ready(Ok(substream)) => { - let substream = substream_from_ref(self.inner.clone(), substream); - self.inner.destroy_outbound(outbound); - return Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { - user_data, - substream, - })); - } - Poll::Pending => { - self.outbound_substreams.push((user_data, outbound)); - } - Poll::Ready(Err(err)) => { - self.inner.destroy_outbound(outbound); - return Poll::Ready(Err(err.into())); - } - } - } - - // Nothing happened. Register our task to be notified and return. - Poll::Pending - } -} - -impl fmt::Debug for Muxing -where - TMuxer: StreamMuxer, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Muxing") - .field("outbound_substreams", &self.outbound_substreams.len()) - .finish() - } -} - -impl Drop for Muxing -where - TMuxer: StreamMuxer, -{ - fn drop(&mut self) { - // The substreams that were produced will continue to work, as the muxer is held in an Arc. - // However we will no longer process any further inbound or outbound substream, and we - // therefore close everything. - for (_, outbound) in self.outbound_substreams.drain(..) { - self.inner.destroy_outbound(outbound); - } - } -} - -impl Future for Close -where - TMuxer: StreamMuxer, -{ - type Output = Result<(), IoError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.muxer.poll_close(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), - } - } -} - -impl fmt::Debug for Close -where - TMuxer: StreamMuxer, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Close").finish() - } -} - -impl fmt::Debug for SubstreamEvent -where - TMuxer: StreamMuxer, - TMuxer::Substream: fmt::Debug, - TUserData: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SubstreamEvent::InboundSubstream { substream } => f - .debug_struct("SubstreamEvent::OutboundClosed") - .field("substream", substream) - .finish(), - SubstreamEvent::OutboundSubstream { - user_data, - substream, - } => f - .debug_struct("SubstreamEvent::OutboundSubstream") - .field("user_data", user_data) - .field("substream", substream) - .finish(), - SubstreamEvent::AddressChange(address) => f - .debug_struct("SubstreamEvent::AddressChange") - .field("address", address) - .finish(), - } - } -} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 90b158dbf1e..c5ef1b0aa88 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -79,11 +79,12 @@ pub use handler::{ pub use registry::{AddAddressResult, AddressRecord, AddressScore}; use connection::pool::{Pool, PoolConfig, PoolEvent}; -use connection::{EstablishedConnection, IncomingInfo, ListenersEvent, ListenersStream, Substream}; +use connection::{EstablishedConnection, IncomingInfo, ListenersEvent, ListenersStream}; use dial_opts::{DialOpts, PeerCondition}; use either::Either; use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream}; use libp2p_core::connection::{ConnectionId, PendingPoint}; +use libp2p_core::muxing::SubstreamBox; use libp2p_core::{ connection::{ConnectedPoint, ListenerId}, multiaddr::Protocol, @@ -110,7 +111,7 @@ use upgrade::UpgradeInfoSend as _; /// /// Implements the [`AsyncRead`](futures::io::AsyncRead) and /// [`AsyncWrite`](futures::io::AsyncWrite) traits. -pub type NegotiatedSubstream = Negotiated>; +pub type NegotiatedSubstream = Negotiated; /// Event generated by the [`NetworkBehaviour`] that the swarm will report back. type TBehaviourOutEvent = ::OutEvent;