From d21cd5fed76d879b02c4f78ea00e02737498b8ca Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 16 May 2022 19:17:49 +0200 Subject: [PATCH] misc/: Move rw-stream-sink into rust-libp2p monorepo (#2641) * misc/rw-stream-sink: Initial commit See https://github.com/paritytech/rw-stream-sink/commit/62a923cc272ab9a5ec27d44b05eb2d75be248e91 * misc/rw-stream-sink: Update to Rust edition 2021 * misc/rw-stream-sink: Add changelog --- CHANGELOG.md | 1 + Cargo.toml | 1 + core/Cargo.toml | 2 +- misc/multistream-select/Cargo.toml | 2 +- misc/rw-stream-sink/CHANGELOG.md | 7 + misc/rw-stream-sink/Cargo.toml | 18 +++ misc/rw-stream-sink/src/lib.rs | 217 +++++++++++++++++++++++++++++ transports/websocket/Cargo.toml | 2 +- 8 files changed, 247 insertions(+), 3 deletions(-) create mode 100644 misc/rw-stream-sink/CHANGELOG.md create mode 100644 misc/rw-stream-sink/Cargo.toml create mode 100644 misc/rw-stream-sink/src/lib.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 668048a6af2..331fcbd413b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ - [`libp2p-metrics` CHANGELOG](misc/metrics/CHANGELOG.md) - [`multistream-select` CHANGELOG](misc/multistream-select/CHANGELOG.md) +- [`rw-stream-sink` CHANGELOG](misc/rw-stream-sink/CHANGELOG.md) # `libp2p` facade crate diff --git a/Cargo.toml b/Cargo.toml index e01ed3778db..9f3fe5f495e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,6 +124,7 @@ members = [ "core", "misc/metrics", "misc/multistream-select", + "misc/rw-stream-sink", "misc/keygen", "misc/prost-codec", "muxers/mplex", diff --git a/core/Cargo.toml b/core/Cargo.toml index de923fce311..41e0c2a1763 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -30,7 +30,7 @@ parking_lot = "0.12.0" pin-project = "1.0.0" prost = "0.10" rand = "0.8" -rw-stream-sink = "0.2.0" +rw-stream-sink = { version = "0.3.0", path = "../misc/rw-stream-sink" } sha2 = "0.10.0" smallvec = "1.6.1" thiserror = "1.0" diff --git a/misc/multistream-select/Cargo.toml b/misc/multistream-select/Cargo.toml index a3845446fb4..9ca21c0362a 100644 --- a/misc/multistream-select/Cargo.toml +++ b/misc/multistream-select/Cargo.toml @@ -27,4 +27,4 @@ libp2p-mplex = { path = "../../muxers/mplex" } libp2p-plaintext = { path = "../../transports/plaintext" } quickcheck = "0.9.0" rand = "0.7.2" -rw-stream-sink = "0.2.1" +rw-stream-sink = { version = "0.3.0", path = "../../misc/rw-stream-sink" } diff --git a/misc/rw-stream-sink/CHANGELOG.md b/misc/rw-stream-sink/CHANGELOG.md new file mode 100644 index 00000000000..121aaeb9cb1 --- /dev/null +++ b/misc/rw-stream-sink/CHANGELOG.md @@ -0,0 +1,7 @@ +# 0.3.0 [unreleased] + +- Move from https://github.com/paritytech/rw-stream-sink/ to https://github.com/libp2p/rust-libp2p. See [Issue 2504]. + +- Update to Rust edition 2021. + +[Issue 2504]: https://github.com/libp2p/rust-libp2p/issues/2504 \ No newline at end of file diff --git a/misc/rw-stream-sink/Cargo.toml b/misc/rw-stream-sink/Cargo.toml new file mode 100644 index 00000000000..9c95e557ee5 --- /dev/null +++ b/misc/rw-stream-sink/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "rw-stream-sink" +edition = "2021" +description = "Adaptator between Stream/Sink and AsyncRead/AsyncWrite" +version = "0.3.0" +authors = ["Parity Technologies "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +futures = "0.3.1" +pin-project = "0.4.6" +static_assertions = "1" + +[dev-dependencies] +async-std = "1.0" diff --git a/misc/rw-stream-sink/src/lib.rs b/misc/rw-stream-sink/src/lib.rs new file mode 100644 index 00000000000..10f5ec09fe1 --- /dev/null +++ b/misc/rw-stream-sink/src/lib.rs @@ -0,0 +1,217 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! This crate provides the [`RwStreamSink`] type. It wraps around a [`Stream`] +//! and [`Sink`] that produces and accepts byte arrays, and implements +//! [`AsyncRead`] and [`AsyncWrite`]. +//! +//! Each call to [`AsyncWrite::poll_write`] will send one packet to the sink. +//! Calls to [`AsyncRead::poll_read`] will read from the stream's incoming packets. + +use futures::{prelude::*, ready}; +use std::{ + io::{self, Read}, + mem, + pin::Pin, + task::{Context, Poll}, +}; + +static_assertions::const_assert!(mem::size_of::() <= mem::size_of::()); + +/// Wraps a [`Stream`] and [`Sink`] whose items are buffers. +/// Implements [`AsyncRead`] and [`AsyncWrite`]. +#[pin_project::pin_project] +pub struct RwStreamSink { + #[pin] + inner: S, + current_item: Option::Ok>>, +} + +impl RwStreamSink { + /// Wraps around `inner`. + pub fn new(inner: S) -> Self { + RwStreamSink { + inner, + current_item: None, + } + } +} + +impl AsyncRead for RwStreamSink +where + S: TryStream, + ::Ok: AsRef<[u8]>, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + let mut this = self.project(); + + // Grab the item to copy from. + let item_to_copy = loop { + if let Some(ref mut i) = this.current_item { + if i.position() < i.get_ref().as_ref().len() as u64 { + break i; + } + } + *this.current_item = Some(match ready!(this.inner.as_mut().try_poll_next(cx)) { + Some(Ok(i)) => std::io::Cursor::new(i), + Some(Err(e)) => return Poll::Ready(Err(e)), + None => return Poll::Ready(Ok(0)), // EOF + }); + }; + + // Copy it! + Poll::Ready(Ok(item_to_copy.read(buf)?)) + } +} + +impl AsyncWrite for RwStreamSink +where + S: TryStream + Sink<::Ok, Error = io::Error>, + ::Ok: for<'r> From<&'r [u8]>, +{ + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + let mut this = self.project(); + ready!(this.inner.as_mut().poll_ready(cx)?); + let n = buf.len(); + if let Err(e) = this.inner.start_send(buf.into()) { + return Poll::Ready(Err(e)); + } + Poll::Ready(Ok(n)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + this.inner.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + this.inner.poll_close(cx) + } +} + +#[cfg(test)] +mod tests { + use super::RwStreamSink; + use async_std::task; + use futures::{channel::mpsc, prelude::*, stream}; + use std::{ + pin::Pin, + task::{Context, Poll}, + }; + + // This struct merges a stream and a sink and is quite useful for tests. + struct Wrapper(St, Si); + + impl Stream for Wrapper + where + St: Stream + Unpin, + Si: Unpin, + { + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.0.poll_next_unpin(cx) + } + } + + impl Sink for Wrapper + where + St: Unpin, + Si: Sink + Unpin, + { + type Error = Si::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.1).poll_ready(cx) + } + + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + Pin::new(&mut self.1).start_send(item) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.1).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.1).poll_close(cx) + } + } + + #[test] + fn basic_reading() { + let (tx1, _) = mpsc::channel::>(10); + let (mut tx2, rx2) = mpsc::channel(10); + + let mut wrapper = RwStreamSink::new(Wrapper(rx2.map(Ok), tx1)); + + task::block_on(async move { + tx2.send(Vec::from("hel")).await.unwrap(); + tx2.send(Vec::from("lo wor")).await.unwrap(); + tx2.send(Vec::from("ld")).await.unwrap(); + tx2.close().await.unwrap(); + + let mut data = Vec::new(); + wrapper.read_to_end(&mut data).await.unwrap(); + assert_eq!(data, b"hello world"); + }) + } + + #[test] + fn skip_empty_stream_items() { + let data: Vec<&[u8]> = vec![b"", b"foo", b"", b"bar", b"", b"baz", b""]; + let mut rws = RwStreamSink::new(stream::iter(data).map(Ok)); + let mut buf = [0; 9]; + task::block_on(async move { + assert_eq!(3, rws.read(&mut buf).await.unwrap()); + assert_eq!(3, rws.read(&mut buf[3..]).await.unwrap()); + assert_eq!(3, rws.read(&mut buf[6..]).await.unwrap()); + assert_eq!(0, rws.read(&mut buf).await.unwrap()); + assert_eq!(b"foobarbaz", &buf[..]) + }) + } + + #[test] + fn partial_read() { + let data: Vec<&[u8]> = vec![b"hell", b"o world"]; + let mut rws = RwStreamSink::new(stream::iter(data).map(Ok)); + let mut buf = [0; 3]; + task::block_on(async move { + assert_eq!(3, rws.read(&mut buf).await.unwrap()); + assert_eq!(b"hel", &buf[..3]); + assert_eq!(0, rws.read(&mut buf[..0]).await.unwrap()); + assert_eq!(1, rws.read(&mut buf).await.unwrap()); + assert_eq!(b"l", &buf[..1]); + assert_eq!(3, rws.read(&mut buf).await.unwrap()); + assert_eq!(b"o w", &buf[..3]); + assert_eq!(0, rws.read(&mut buf[..0]).await.unwrap()); + assert_eq!(3, rws.read(&mut buf).await.unwrap()); + assert_eq!(b"orl", &buf[..3]); + assert_eq!(1, rws.read(&mut buf).await.unwrap()); + assert_eq!(b"d", &buf[..1]); + assert_eq!(0, rws.read(&mut buf).await.unwrap()); + }) + } +} diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index bc93facae9b..e4bbe3297da 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -18,7 +18,7 @@ libp2p-core = { version = "0.33.0", path = "../../core", default-features = fals log = "0.4.8" parking_lot = "0.12.0" quicksink = "0.1" -rw-stream-sink = "0.2.0" +rw-stream-sink = { version = "0.3.0", path = "../../misc/rw-stream-sink" } soketto = { version = "0.7.0", features = ["deflate"] } url = "2.1" webpki-roots = "0.22"