Skip to content

Commit

Permalink
muxers/mplex: Implement AsyncRead and AsyncWrite for Substream (#…
Browse files Browse the repository at this point in the history
…2706)

This aligns the public API of the `libp2p-mplex` module with the one
from `libp2p-yamux`. This change has two benefits:

1. For standalone users of `libp2p-mplex`, the substreams itself are
now useful, similar to `libp2p-yamux` and don't necessarily need to
be polled via the `StreamMuxer`. The `StreamMuxer` only forwards to
the `Async{Read,Write}` implementations.

2. This will reduce the diff of #2648 because we can chunk the one
giant commit into smaller atomic ones.
  • Loading branch information
thomaseizinger committed Jun 20, 2022
1 parent 3c120ef commit ea487ae
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 32 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -46,6 +46,8 @@
# 0.46.0 [unreleased]
- Semver bump Rust from `1.56.1` to `1.60.0` . See [PR 2646].
- Added weak dependencies for features. See [PR 2646].
- Update individual crates.
- Update to [`libp2p-mplex` `v0.34.0`](muxers/mplex/CHANGELOG.md).

[PR 2646]: https://github.com/libp2p/rust-libp2p/pull/2646

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -83,7 +83,7 @@ libp2p-floodsub = { version = "0.36.0", path = "protocols/floodsub", optional =
libp2p-identify = { version = "0.36.1", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.37.1", path = "protocols/kad", optional = true }
libp2p-metrics = { version = "0.6.0", path = "misc/metrics", optional = true }
libp2p-mplex = { version = "0.33.0", path = "muxers/mplex", optional = true }
libp2p-mplex = { version = "0.34.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.36.0", path = "transports/noise", optional = true }
libp2p-ping = { version = "0.36.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.33.0", path = "transports/plaintext", optional = true }
Expand Down
6 changes: 6 additions & 0 deletions muxers/mplex/CHANGELOG.md
@@ -1,3 +1,9 @@
# 0.34.0 [unreleased]

- `Substream` now implements `AsyncRead` and `AsyncWrite`. See [PR 2706].

[PR 2706]: https://github.com/libp2p/rust-libp2p/pull/2706/

# 0.33.0

- Update to `libp2p-core` `v0.33.0`.
Expand Down
2 changes: 1 addition & 1 deletion muxers/mplex/Cargo.toml
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-mplex"
edition = "2021"
rust-version = "1.56.1"
description = "Mplex multiplexing protocol for libp2p"
version = "0.33.0"
version = "0.34.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
124 changes: 94 additions & 30 deletions muxers/mplex/src/lib.rs
Expand Up @@ -33,7 +33,7 @@ use libp2p_core::{
StreamMuxer,
};
use parking_lot::Mutex;
use std::{cmp, iter, task::Context, task::Poll};
use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll};

impl UpgradeInfo for MplexConfig {
type Info = &'static [u8];
Expand All @@ -54,7 +54,7 @@ where

fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Multiplex {
io: Mutex::new(io::Multiplexed::new(socket, self)),
io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))),
}))
}
}
Expand All @@ -69,7 +69,7 @@ where

fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Multiplex {
io: Mutex::new(io::Multiplexed::new(socket, self)),
io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))),
}))
}
}
Expand All @@ -79,14 +79,14 @@ where
/// This implementation isn't capable of detecting when the underlying socket changes its address,
/// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted.
pub struct Multiplex<C> {
io: Mutex<io::Multiplexed<C>>,
io: Arc<Mutex<io::Multiplexed<C>>>,
}

impl<C> StreamMuxer for Multiplex<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
type Substream = Substream;
type Substream = Substream<C>;
type OutboundSubstream = OutboundSubstream;
type Error = io::Error;

Expand All @@ -95,7 +95,7 @@ where
cx: &mut Context<'_>,
) -> Poll<io::Result<StreamMuxerEvent<Self::Substream>>> {
let stream_id = ready!(self.io.lock().poll_next_stream(cx))?;
let stream = Substream::new(stream_id);
let stream = Substream::new(stream_id, self.io.clone());
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream)))
}

Expand All @@ -109,7 +109,7 @@ where
_: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, io::Error>> {
let stream_id = ready!(self.io.lock().poll_open_stream(cx))?;
Poll::Ready(Ok(Substream::new(stream_id)))
Poll::Ready(Ok(Substream::new(stream_id, self.io.clone())))
}

fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
Expand All @@ -122,22 +122,7 @@ where
substream: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
loop {
// Try to read from the current (i.e. last received) frame.
if !substream.current_data.is_empty() {
let len = cmp::min(substream.current_data.len(), buf.len());
buf[..len].copy_from_slice(&substream.current_data.split_to(len));
return Poll::Ready(Ok(len));
}

// Read the next data frame from the multiplexed stream.
match ready!(self.io.lock().poll_read_stream(cx, substream.id))? {
Some(data) => {
substream.current_data = data;
}
None => return Poll::Ready(Ok(0)),
}
}
Pin::new(substream).poll_read(cx, buf)
}

fn write_substream(
Expand All @@ -146,27 +131,27 @@ where
substream: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
self.io.lock().poll_write_stream(cx, substream.id, buf)
Pin::new(substream).poll_write(cx, buf)
}

fn flush_substream(
&self,
cx: &mut Context<'_>,
substream: &mut Self::Substream,
) -> Poll<Result<(), io::Error>> {
self.io.lock().poll_flush_stream(cx, substream.id)
Pin::new(substream).poll_flush(cx)
}

fn shutdown_substream(
&self,
cx: &mut Context<'_>,
substream: &mut Self::Substream,
) -> Poll<Result<(), io::Error>> {
self.io.lock().poll_close_stream(cx, substream.id)
Pin::new(substream).poll_close(cx)
}

fn destroy_substream(&self, sub: Self::Substream) {
self.io.lock().drop_stream(sub.id);
std::mem::drop(sub)
}

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Expand All @@ -177,19 +162,98 @@ where
/// Active attempt to open an outbound substream.
pub struct OutboundSubstream {}

impl<C> AsyncRead for Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();

loop {
// Try to read from the current (i.e. last received) frame.
if !this.current_data.is_empty() {
let len = cmp::min(this.current_data.len(), buf.len());
buf[..len].copy_from_slice(&this.current_data.split_to(len));
return Poll::Ready(Ok(len));
}

// Read the next data frame from the multiplexed stream.
match ready!(this.io.lock().poll_read_stream(cx, this.id))? {
Some(data) => {
this.current_data = data;
}
None => return Poll::Ready(Ok(0)),
}
}
}
}

impl<C> AsyncWrite for Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();

this.io.lock().poll_write_stream(cx, this.id, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.get_mut();

this.io.lock().poll_flush_stream(cx, this.id)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.get_mut();
let mut io = this.io.lock();

ready!(io.poll_close_stream(cx, this.id))?;
ready!(io.poll_flush_stream(cx, this.id))?;

Poll::Ready(Ok(()))
}
}

/// Active substream to the remote.
pub struct Substream {
pub struct Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
/// The unique, local identifier of the substream.
id: LocalStreamId,
/// The current data frame the substream is reading from.
current_data: Bytes,
/// Shared reference to the actual muxer.
io: Arc<Mutex<io::Multiplexed<C>>>,
}

impl Substream {
fn new(id: LocalStreamId) -> Self {
impl<C> Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
fn new(id: LocalStreamId, io: Arc<Mutex<io::Multiplexed<C>>>) -> Self {
Self {
id,
current_data: Bytes::new(),
io,
}
}
}

impl<C> Drop for Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
fn drop(&mut self) {
self.io.lock().drop_stream(self.id);
}
}

0 comments on commit ea487ae

Please sign in to comment.