diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 170a4537901..969c56c782e 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -51,19 +51,14 @@ //! The upgrade process will take ownership of the connection, which makes it possible for the //! implementation of `StreamMuxer` to control everything that happens on the wire. -use fnv::FnvHashMap; use futures::{future, prelude::*, task::Context, task::Poll}; use multiaddr::Multiaddr; -use parking_lot::Mutex; -use std::{ - fmt, io, - ops::Deref, - pin::Pin, - sync::atomic::{AtomicUsize, Ordering}, -}; +use std::{fmt, io, ops::Deref, pin::Pin}; +pub use self::boxed::StreamMuxerBox; pub use self::singleton::SingletonMuxer; +mod boxed; mod singleton; /// Implemented on objects that can open and manage substreams. @@ -491,251 +486,3 @@ where .destroy_substream(self.substream.take().expect("substream was empty")) } } - -/// Abstract `StreamMuxer`. -pub struct StreamMuxerBox { - inner: Box< - dyn StreamMuxer - + Send - + Sync, - >, -} - -impl StreamMuxerBox { - /// Turns a stream muxer into a `StreamMuxerBox`. - pub fn new(muxer: T) -> StreamMuxerBox - where - T: StreamMuxer + Send + Sync + 'static, - T::OutboundSubstream: Send, - T::Substream: Send, - { - let wrap = Wrap { - inner: muxer, - substreams: Mutex::new(Default::default()), - next_substream: AtomicUsize::new(0), - outbound: Mutex::new(Default::default()), - next_outbound: AtomicUsize::new(0), - }; - - StreamMuxerBox { - inner: Box::new(wrap), - } - } -} - -impl StreamMuxer for StreamMuxerBox { - type Substream = usize; // TODO: use a newtype - type OutboundSubstream = usize; // TODO: use a newtype - type Error = io::Error; - - #[inline] - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - self.inner.poll_event(cx) - } - - #[inline] - fn open_outbound(&self) -> Self::OutboundSubstream { - self.inner.open_outbound() - } - - #[inline] - fn poll_outbound( - &self, - cx: &mut Context<'_>, - s: &mut Self::OutboundSubstream, - ) -> Poll> { - self.inner.poll_outbound(cx, s) - } - - #[inline] - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - self.inner.destroy_outbound(substream) - } - - #[inline] - fn read_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll> { - self.inner.read_substream(cx, s, buf) - } - - #[inline] - fn write_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - self.inner.write_substream(cx, s, buf) - } - - #[inline] - fn flush_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - self.inner.flush_substream(cx, s) - } - - #[inline] - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - self.inner.shutdown_substream(cx, s) - } - - #[inline] - fn destroy_substream(&self, s: Self::Substream) { - self.inner.destroy_substream(s) - } - - #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx) - } -} - -struct Wrap -where - T: StreamMuxer, -{ - inner: T, - substreams: Mutex>, - next_substream: AtomicUsize, - outbound: Mutex>, - next_outbound: AtomicUsize, -} - -impl StreamMuxer for Wrap -where - T: StreamMuxer, -{ - type Substream = usize; // TODO: use a newtype - type OutboundSubstream = usize; // TODO: use a newtype - type Error = io::Error; - - #[inline] - fn poll_event( - &self, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let substream = match self.inner.poll_event(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) => { - return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) - } - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), - }; - - let id = self.next_substream.fetch_add(1, Ordering::Relaxed); - self.substreams.lock().insert(id, substream); - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(id))) - } - - #[inline] - fn open_outbound(&self) -> Self::OutboundSubstream { - let outbound = self.inner.open_outbound(); - let id = self.next_outbound.fetch_add(1, Ordering::Relaxed); - self.outbound.lock().insert(id, outbound); - id - } - - #[inline] - fn poll_outbound( - &self, - cx: &mut Context<'_>, - substream: &mut Self::OutboundSubstream, - ) -> Poll> { - let mut list = self.outbound.lock(); - let substream = match self - .inner - .poll_outbound(cx, list.get_mut(substream).unwrap()) - { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(s)) => s, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), - }; - let id = self.next_substream.fetch_add(1, Ordering::Relaxed); - self.substreams.lock().insert(id, substream); - Poll::Ready(Ok(id)) - } - - #[inline] - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - let mut list = self.outbound.lock(); - self.inner - .destroy_outbound(list.remove(&substream).unwrap()) - } - - #[inline] - fn read_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner - .read_substream(cx, list.get_mut(s).unwrap(), buf) - .map_err(|e| e.into()) - } - - #[inline] - fn write_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner - .write_substream(cx, list.get_mut(s).unwrap(), buf) - .map_err(|e| e.into()) - } - - #[inline] - fn flush_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner - .flush_substream(cx, list.get_mut(s).unwrap()) - .map_err(|e| e.into()) - } - - #[inline] - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - let mut list = self.substreams.lock(); - self.inner - .shutdown_substream(cx, list.get_mut(s).unwrap()) - .map_err(|e| e.into()) - } - - #[inline] - fn destroy_substream(&self, substream: Self::Substream) { - let mut list = self.substreams.lock(); - self.inner - .destroy_substream(list.remove(&substream).unwrap()) - } - - #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx).map_err(|e| e.into()) - } -} diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs new file mode 100644 index 00000000000..1284febe5dd --- /dev/null +++ b/core/src/muxing/boxed.rs @@ -0,0 +1,255 @@ +use crate::muxing::StreamMuxerEvent; +use crate::StreamMuxer; +use fnv::FnvHashMap; +use parking_lot::Mutex; +use std::io; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll}; + +/// Abstract `StreamMuxer`. +pub struct StreamMuxerBox { + inner: Box< + dyn StreamMuxer + + Send + + Sync, + >, +} + +struct Wrap +where + T: StreamMuxer, +{ + inner: T, + substreams: Mutex>, + next_substream: AtomicUsize, + outbound: Mutex>, + next_outbound: AtomicUsize, +} + +impl StreamMuxer for Wrap +where + T: StreamMuxer, +{ + type Substream = usize; // TODO: use a newtype + type OutboundSubstream = usize; // TODO: use a newtype + type Error = io::Error; + + #[inline] + fn poll_event( + &self, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + let substream = match self.inner.poll_event(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) => { + return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) + } + Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + }; + + let id = self.next_substream.fetch_add(1, Ordering::Relaxed); + self.substreams.lock().insert(id, substream); + Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(id))) + } + + #[inline] + fn open_outbound(&self) -> Self::OutboundSubstream { + let outbound = self.inner.open_outbound(); + let id = self.next_outbound.fetch_add(1, Ordering::Relaxed); + self.outbound.lock().insert(id, outbound); + id + } + + #[inline] + fn poll_outbound( + &self, + cx: &mut Context<'_>, + substream: &mut Self::OutboundSubstream, + ) -> Poll> { + let mut list = self.outbound.lock(); + let substream = match self + .inner + .poll_outbound(cx, list.get_mut(substream).unwrap()) + { + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(s)) => s, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + }; + let id = self.next_substream.fetch_add(1, Ordering::Relaxed); + self.substreams.lock().insert(id, substream); + Poll::Ready(Ok(id)) + } + + #[inline] + fn destroy_outbound(&self, substream: Self::OutboundSubstream) { + let mut list = self.outbound.lock(); + self.inner + .destroy_outbound(list.remove(&substream).unwrap()) + } + + #[inline] + fn read_substream( + &self, + cx: &mut Context<'_>, + s: &mut Self::Substream, + buf: &mut [u8], + ) -> Poll> { + let mut list = self.substreams.lock(); + self.inner + .read_substream(cx, list.get_mut(s).unwrap(), buf) + .map_err(|e| e.into()) + } + + #[inline] + fn write_substream( + &self, + cx: &mut Context<'_>, + s: &mut Self::Substream, + buf: &[u8], + ) -> Poll> { + let mut list = self.substreams.lock(); + self.inner + .write_substream(cx, list.get_mut(s).unwrap(), buf) + .map_err(|e| e.into()) + } + + #[inline] + fn flush_substream( + &self, + cx: &mut Context<'_>, + s: &mut Self::Substream, + ) -> Poll> { + let mut list = self.substreams.lock(); + self.inner + .flush_substream(cx, list.get_mut(s).unwrap()) + .map_err(|e| e.into()) + } + + #[inline] + fn shutdown_substream( + &self, + cx: &mut Context<'_>, + s: &mut Self::Substream, + ) -> Poll> { + let mut list = self.substreams.lock(); + self.inner + .shutdown_substream(cx, list.get_mut(s).unwrap()) + .map_err(|e| e.into()) + } + + #[inline] + fn destroy_substream(&self, substream: Self::Substream) { + let mut list = self.substreams.lock(); + self.inner + .destroy_substream(list.remove(&substream).unwrap()) + } + + #[inline] + fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx).map_err(|e| e.into()) + } +} + +impl StreamMuxerBox { + /// Turns a stream muxer into a `StreamMuxerBox`. + pub fn new(muxer: T) -> StreamMuxerBox + where + T: StreamMuxer + Send + Sync + 'static, + T::OutboundSubstream: Send, + T::Substream: Send, + { + let wrap = Wrap { + inner: muxer, + substreams: Mutex::new(Default::default()), + next_substream: AtomicUsize::new(0), + outbound: Mutex::new(Default::default()), + next_outbound: AtomicUsize::new(0), + }; + + StreamMuxerBox { + inner: Box::new(wrap), + } + } +} + +impl StreamMuxer for StreamMuxerBox { + type Substream = usize; // TODO: use a newtype + type OutboundSubstream = usize; // TODO: use a newtype + type Error = io::Error; + + #[inline] + fn poll_event( + &self, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + self.inner.poll_event(cx) + } + + #[inline] + fn open_outbound(&self) -> Self::OutboundSubstream { + self.inner.open_outbound() + } + + #[inline] + fn poll_outbound( + &self, + cx: &mut Context<'_>, + s: &mut Self::OutboundSubstream, + ) -> Poll> { + self.inner.poll_outbound(cx, s) + } + + #[inline] + fn destroy_outbound(&self, substream: Self::OutboundSubstream) { + self.inner.destroy_outbound(substream) + } + + #[inline] + fn read_substream( + &self, + cx: &mut Context<'_>, + s: &mut Self::Substream, + buf: &mut [u8], + ) -> Poll> { + self.inner.read_substream(cx, s, buf) + } + + #[inline] + fn write_substream( + &self, + cx: &mut Context<'_>, + s: &mut Self::Substream, + buf: &[u8], + ) -> Poll> { + self.inner.write_substream(cx, s, buf) + } + + #[inline] + fn flush_substream( + &self, + cx: &mut Context<'_>, + s: &mut Self::Substream, + ) -> Poll> { + self.inner.flush_substream(cx, s) + } + + #[inline] + fn shutdown_substream( + &self, + cx: &mut Context<'_>, + s: &mut Self::Substream, + ) -> Poll> { + self.inner.shutdown_substream(cx, s) + } + + #[inline] + fn destroy_substream(&self, s: Self::Substream) { + self.inner.destroy_substream(s) + } + + #[inline] + fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx) + } +}