Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

muxers/mplex: Implement AsyncRead and AsyncWrite for Substream #2705

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Expand Up @@ -3,7 +3,7 @@ name = "libp2p"
edition = "2021"
rust-version = "1.60.0"
description = "Peer-to-peer networking library"
version = "0.46.0"
version = "0.47.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down Expand Up @@ -83,7 +83,7 @@ libp2p-floodsub = { version = "0.36.0", path = "protocols/floodsub", optional =
libp2p-identify = { version = "0.36.1", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.37.1", path = "protocols/kad", optional = true }
libp2p-metrics = { version = "0.6.0", path = "misc/metrics", optional = true }
libp2p-mplex = { version = "0.33.0", path = "muxers/mplex", optional = true }
libp2p-mplex = { version = "0.34.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.36.0", path = "transports/noise", optional = true }
libp2p-ping = { version = "0.36.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.33.0", path = "transports/plaintext", optional = true }
Expand Down
6 changes: 6 additions & 0 deletions muxers/mplex/CHANGELOG.md
@@ -1,3 +1,9 @@
# 0.34.0

- `Substream` now implements `AsyncRead` and `AsyncWrite`. See [PR XXXX].

[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX/

# 0.33.0

- Update to `libp2p-core` `v0.33.0`.
Expand Down
2 changes: 1 addition & 1 deletion muxers/mplex/Cargo.toml
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-mplex"
edition = "2021"
rust-version = "1.56.1"
description = "Mplex multiplexing protocol for libp2p"
version = "0.33.0"
version = "0.34.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
124 changes: 94 additions & 30 deletions muxers/mplex/src/lib.rs
Expand Up @@ -33,7 +33,7 @@ use libp2p_core::{
StreamMuxer,
};
use parking_lot::Mutex;
use std::{cmp, iter, task::Context, task::Poll};
use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll};

impl UpgradeInfo for MplexConfig {
type Info = &'static [u8];
Expand All @@ -54,7 +54,7 @@ where

fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Multiplex {
io: Mutex::new(io::Multiplexed::new(socket, self)),
io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))),
}))
}
}
Expand All @@ -69,7 +69,7 @@ where

fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Multiplex {
io: Mutex::new(io::Multiplexed::new(socket, self)),
io: Arc::new(Mutex::new(io::Multiplexed::new(socket, self))),
}))
}
}
Expand All @@ -79,14 +79,14 @@ where
/// This implementation isn't capable of detecting when the underlying socket changes its address,
/// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted.
pub struct Multiplex<C> {
io: Mutex<io::Multiplexed<C>>,
io: Arc<Mutex<io::Multiplexed<C>>>,
}

impl<C> StreamMuxer for Multiplex<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
type Substream = Substream;
type Substream = Substream<C>;
type OutboundSubstream = OutboundSubstream;
type Error = io::Error;

Expand All @@ -95,7 +95,7 @@ where
cx: &mut Context<'_>,
) -> Poll<io::Result<StreamMuxerEvent<Self::Substream>>> {
let stream_id = ready!(self.io.lock().poll_next_stream(cx))?;
let stream = Substream::new(stream_id);
let stream = Substream::new(stream_id, self.io.clone());
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream)))
}

Expand All @@ -109,7 +109,7 @@ where
_: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, io::Error>> {
let stream_id = ready!(self.io.lock().poll_open_stream(cx))?;
Poll::Ready(Ok(Substream::new(stream_id)))
Poll::Ready(Ok(Substream::new(stream_id, self.io.clone())))
}

fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
Expand All @@ -122,22 +122,7 @@ where
substream: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
loop {
// Try to read from the current (i.e. last received) frame.
if !substream.current_data.is_empty() {
let len = cmp::min(substream.current_data.len(), buf.len());
buf[..len].copy_from_slice(&substream.current_data.split_to(len));
return Poll::Ready(Ok(len));
}

// Read the next data frame from the multiplexed stream.
match ready!(self.io.lock().poll_read_stream(cx, substream.id))? {
Some(data) => {
substream.current_data = data;
}
None => return Poll::Ready(Ok(0)),
}
}
Pin::new(substream).poll_read(cx, buf)
}

fn write_substream(
Expand All @@ -146,27 +131,27 @@ where
substream: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
self.io.lock().poll_write_stream(cx, substream.id, buf)
Pin::new(substream).poll_write(cx, buf)
}

fn flush_substream(
&self,
cx: &mut Context<'_>,
substream: &mut Self::Substream,
) -> Poll<Result<(), io::Error>> {
self.io.lock().poll_flush_stream(cx, substream.id)
Pin::new(substream).poll_flush(cx)
}

fn shutdown_substream(
&self,
cx: &mut Context<'_>,
substream: &mut Self::Substream,
) -> Poll<Result<(), io::Error>> {
self.io.lock().poll_close_stream(cx, substream.id)
Pin::new(substream).poll_close(cx)
}

fn destroy_substream(&self, sub: Self::Substream) {
self.io.lock().drop_stream(sub.id);
std::mem::drop(sub)
}

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Expand All @@ -177,19 +162,98 @@ where
/// Active attempt to open an outbound substream.
pub struct OutboundSubstream {}

impl<C> AsyncRead for Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();

loop {
// Try to read from the current (i.e. last received) frame.
if !this.current_data.is_empty() {
let len = cmp::min(this.current_data.len(), buf.len());
buf[..len].copy_from_slice(&this.current_data.split_to(len));
return Poll::Ready(Ok(len));
}

// Read the next data frame from the multiplexed stream.
match ready!(this.io.lock().poll_read_stream(cx, this.id))? {
Some(data) => {
this.current_data = data;
}
None => return Poll::Ready(Ok(0)),
}
}
}
}

impl<C> AsyncWrite for Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();

this.io.lock().poll_write_stream(cx, this.id, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.get_mut();

this.io.lock().poll_flush_stream(cx, this.id)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.get_mut();
let mut io = this.io.lock();

ready!(io.poll_close_stream(cx, this.id))?;
ready!(io.poll_flush_stream(cx, this.id))?;

Poll::Ready(Ok(()))
}
}

/// Active substream to the remote.
pub struct Substream {
pub struct Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
/// The unique, local identifier of the substream.
id: LocalStreamId,
/// The current data frame the substream is reading from.
current_data: Bytes,
/// Shared reference to the actual muxer.
io: Arc<Mutex<io::Multiplexed<C>>>,
}

impl Substream {
fn new(id: LocalStreamId) -> Self {
impl<C> Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
fn new(id: LocalStreamId, io: Arc<Mutex<io::Multiplexed<C>>>) -> Self {
Self {
id,
current_data: Bytes::new(),
io,
}
}
}

impl<C> Drop for Substream<C>
where
C: AsyncRead + AsyncWrite + Unpin,
{
fn drop(&mut self) {
self.io.lock().drop_stream(self.id);
}
}