Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/muxing: Introduce boxed module #2703

Merged
merged 1 commit into from Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
259 changes: 3 additions & 256 deletions core/src/muxing.rs
Expand Up @@ -51,19 +51,14 @@
//! The upgrade process will take ownership of the connection, which makes it possible for the
//! implementation of `StreamMuxer` to control everything that happens on the wire.

use fnv::FnvHashMap;
use futures::{future, prelude::*, task::Context, task::Poll};
use multiaddr::Multiaddr;
use parking_lot::Mutex;
use std::{
fmt, io,
ops::Deref,
pin::Pin,
sync::atomic::{AtomicUsize, Ordering},
};
use std::{fmt, io, ops::Deref, pin::Pin};

pub use self::boxed::StreamMuxerBox;
pub use self::singleton::SingletonMuxer;

mod boxed;
mod singleton;

/// Implemented on objects that can open and manage substreams.
Expand Down Expand Up @@ -491,251 +486,3 @@ where
.destroy_substream(self.substream.take().expect("substream was empty"))
}
}

/// Abstract `StreamMuxer`.
pub struct StreamMuxerBox {
inner: Box<
dyn StreamMuxer<Substream = usize, OutboundSubstream = usize, Error = io::Error>
+ Send
+ Sync,
>,
}

impl StreamMuxerBox {
/// Turns a stream muxer into a `StreamMuxerBox`.
pub fn new<T>(muxer: T) -> StreamMuxerBox
where
T: StreamMuxer + Send + Sync + 'static,
T::OutboundSubstream: Send,
T::Substream: Send,
{
let wrap = Wrap {
inner: muxer,
substreams: Mutex::new(Default::default()),
next_substream: AtomicUsize::new(0),
outbound: Mutex::new(Default::default()),
next_outbound: AtomicUsize::new(0),
};

StreamMuxerBox {
inner: Box::new(wrap),
}
}
}

impl StreamMuxer for StreamMuxerBox {
type Substream = usize; // TODO: use a newtype
type OutboundSubstream = usize; // TODO: use a newtype
type Error = io::Error;

#[inline]
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
self.inner.poll_event(cx)
}

#[inline]
fn open_outbound(&self) -> Self::OutboundSubstream {
self.inner.open_outbound()
}

#[inline]
fn poll_outbound(
&self,
cx: &mut Context<'_>,
s: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner.poll_outbound(cx, s)
}

#[inline]
fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
self.inner.destroy_outbound(substream)
}

#[inline]
fn read_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, Self::Error>> {
self.inner.read_substream(cx, s, buf)
}

#[inline]
fn write_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, Self::Error>> {
self.inner.write_substream(cx, s, buf)
}

#[inline]
fn flush_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
self.inner.flush_substream(cx, s)
}

#[inline]
fn shutdown_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
self.inner.shutdown_substream(cx, s)
}

#[inline]
fn destroy_substream(&self, s: Self::Substream) {
self.inner.destroy_substream(s)
}

#[inline]
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close(cx)
}
}

struct Wrap<T>
where
T: StreamMuxer,
{
inner: T,
substreams: Mutex<FnvHashMap<usize, T::Substream>>,
next_substream: AtomicUsize,
outbound: Mutex<FnvHashMap<usize, T::OutboundSubstream>>,
next_outbound: AtomicUsize,
}

impl<T> StreamMuxer for Wrap<T>
where
T: StreamMuxer,
{
type Substream = usize; // TODO: use a newtype
type OutboundSubstream = usize; // TODO: use a newtype
type Error = io::Error;

#[inline]
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
let substream = match self.inner.poll_event(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) => {
return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a)))
}
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
};

let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
self.substreams.lock().insert(id, substream);
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(id)))
}

#[inline]
fn open_outbound(&self) -> Self::OutboundSubstream {
let outbound = self.inner.open_outbound();
let id = self.next_outbound.fetch_add(1, Ordering::Relaxed);
self.outbound.lock().insert(id, outbound);
id
}

#[inline]
fn poll_outbound(
&self,
cx: &mut Context<'_>,
substream: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
let mut list = self.outbound.lock();
let substream = match self
.inner
.poll_outbound(cx, list.get_mut(substream).unwrap())
{
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(s)) => s,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
};
let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
self.substreams.lock().insert(id, substream);
Poll::Ready(Ok(id))
}

#[inline]
fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
let mut list = self.outbound.lock();
self.inner
.destroy_outbound(list.remove(&substream).unwrap())
}

#[inline]
fn read_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, Self::Error>> {
let mut list = self.substreams.lock();
self.inner
.read_substream(cx, list.get_mut(s).unwrap(), buf)
.map_err(|e| e.into())
}

#[inline]
fn write_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, Self::Error>> {
let mut list = self.substreams.lock();
self.inner
.write_substream(cx, list.get_mut(s).unwrap(), buf)
.map_err(|e| e.into())
}

#[inline]
fn flush_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
let mut list = self.substreams.lock();
self.inner
.flush_substream(cx, list.get_mut(s).unwrap())
.map_err(|e| e.into())
}

#[inline]
fn shutdown_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
let mut list = self.substreams.lock();
self.inner
.shutdown_substream(cx, list.get_mut(s).unwrap())
.map_err(|e| e.into())
}

#[inline]
fn destroy_substream(&self, substream: Self::Substream) {
let mut list = self.substreams.lock();
self.inner
.destroy_substream(list.remove(&substream).unwrap())
}

#[inline]
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close(cx).map_err(|e| e.into())
}
}