diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index 6d28588233e..658d612ef90 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -22,6 +22,9 @@ categories = ["asynchronous"] [features] default = ["time"] time = ["tokio/time"] +net = ["tokio/net"] +io-util = ["tokio/io-util"] +fs = ["tokio/fs"] [dependencies] futures-core = { version = "0.3.0" } @@ -34,4 +37,4 @@ tokio = { version = "1.0", path = "../tokio", features = ["full"] } tokio-test = { path = "../tokio-test" } futures = { version = "0.3", default-features = false } -proptest = "0.10.0" \ No newline at end of file +proptest = "0.10.0" diff --git a/tokio-stream/src/lib.rs b/tokio-stream/src/lib.rs index 3c6d84ee469..a6d6f22ead5 100644 --- a/tokio-stream/src/lib.rs +++ b/tokio-stream/src/lib.rs @@ -81,6 +81,8 @@ #[macro_use] mod macros; +pub mod wrappers; + mod stream_ext; pub use stream_ext::{collect::FromStream, StreamExt}; diff --git a/tokio-stream/src/macros.rs b/tokio-stream/src/macros.rs index 0d493a85119..39ad86cc5d8 100644 --- a/tokio-stream/src/macros.rs +++ b/tokio-stream/src/macros.rs @@ -1,3 +1,33 @@ +macro_rules! cfg_fs { + ($($item:item)*) => { + $( + #[cfg(feature = "fs")] + #[cfg_attr(docsrs, doc(cfg(feature = "fs")))] + $item + )* + } +} + +macro_rules! cfg_io_util { + ($($item:item)*) => { + $( + #[cfg(feature = "io-util")] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + $item + )* + } +} + +macro_rules! cfg_net { + ($($item:item)*) => { + $( + #[cfg(feature = "net")] + #[cfg_attr(docsrs, doc(cfg(feature = "net")))] + $item + )* + } +} + macro_rules! cfg_time { ($($item:item)*) => { $( diff --git a/tokio-stream/src/wrappers.rs b/tokio-stream/src/wrappers.rs new file mode 100644 index 00000000000..c0ffb234a09 --- /dev/null +++ b/tokio-stream/src/wrappers.rs @@ -0,0 +1,35 @@ +//! Wrappers for Tokio types that implement `Stream`. + +mod mpsc_bounded; +pub use mpsc_bounded::ReceiverStream; + +mod mpsc_unbounded; +pub use mpsc_unbounded::UnboundedReceiverStream; + +cfg_time! { + mod interval; + pub use interval::IntervalStream; +} + +cfg_net! { + mod tcp_listener; + pub use tcp_listener::TcpListenerStream; + + #[cfg(unix)] + mod unix_listener; + #[cfg(unix)] + pub use unix_listener::UnixListenerStream; +} + +cfg_io_util! { + mod split; + pub use split::SplitStream; + + mod lines; + pub use lines::LinesStream; +} + +cfg_fs! { + mod read_dir; + pub use read_dir::ReadDirStream; +} diff --git a/tokio-stream/src/wrappers/interval.rs b/tokio-stream/src/wrappers/interval.rs new file mode 100644 index 00000000000..2bf0194bd0f --- /dev/null +++ b/tokio-stream/src/wrappers/interval.rs @@ -0,0 +1,50 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::time::{Instant, Interval}; + +/// A wrapper around [`Interval`] that implements [`Stream`]. +/// +/// [`Interval`]: struct@tokio::time::Interval +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "time")))] +pub struct IntervalStream { + inner: Interval, +} + +impl IntervalStream { + /// Create a new `IntervalStream`. + pub fn new(interval: Interval) -> Self { + Self { inner: interval } + } + + /// Get back the inner `Interval`. + pub fn into_inner(self) -> Interval { + self.inner + } +} + +impl Stream for IntervalStream { + type Item = Instant; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_tick(cx).map(Some) + } + + fn size_hint(&self) -> (usize, Option) { + (std::usize::MAX, None) + } +} + +impl AsRef for IntervalStream { + fn as_ref(&self) -> &Interval { + &self.inner + } +} + +impl AsMut for IntervalStream { + fn as_mut(&mut self) -> &mut Interval { + &mut self.inner + } +} diff --git a/tokio-stream/src/wrappers/lines.rs b/tokio-stream/src/wrappers/lines.rs new file mode 100644 index 00000000000..4850429a72d --- /dev/null +++ b/tokio-stream/src/wrappers/lines.rs @@ -0,0 +1,59 @@ +use crate::Stream; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncBufRead, Lines}; + +pin_project! { + /// A wrapper around [`tokio::io::Lines`] that implements [`Stream`]. + /// + /// [`tokio::io::Lines`]: struct@tokio::io::Lines + /// [`Stream`]: trait@crate::Stream + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + pub struct LinesStream { + #[pin] + inner: Lines, + } +} + +impl LinesStream { + /// Create a new `LinesStream`. + pub fn new(lines: Lines) -> Self { + Self { inner: lines } + } + + /// Get back the inner `Lines`. + pub fn into_inner(self) -> Lines { + self.inner + } + + /// Obtain a pinned reference to the inner `Lines`. + pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Lines> { + self.project().inner + } +} + +impl Stream for LinesStream { + type Item = io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .inner + .poll_next_line(cx) + .map(Result::transpose) + } +} + +impl AsRef> for LinesStream { + fn as_ref(&self) -> &Lines { + &self.inner + } +} + +impl AsMut> for LinesStream { + fn as_mut(&mut self) -> &mut Lines { + &mut self.inner + } +} diff --git a/tokio-stream/src/wrappers/mpsc_bounded.rs b/tokio-stream/src/wrappers/mpsc_bounded.rs new file mode 100644 index 00000000000..e4f90000985 --- /dev/null +++ b/tokio-stream/src/wrappers/mpsc_bounded.rs @@ -0,0 +1,59 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::sync::mpsc::Receiver; + +/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +pub struct ReceiverStream { + inner: Receiver, +} + +impl ReceiverStream { + /// Create a new `ReceiverStream`. + pub fn new(recv: Receiver) -> Self { + Self { inner: recv } + } + + /// Get back the inner `Receiver`. + pub fn into_inner(self) -> Receiver { + self.inner + } + + /// Closes the receiving half of a channel without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. Any + /// outstanding [`Permit`] values will still be able to send messages. + /// + /// To guarantee no messages are dropped, after calling `close()`, you must + /// receive all items from the stream until `None` is returned. + /// + /// [`Permit`]: struct@tokio::sync::mpsc::Permit + pub fn close(&mut self) { + self.inner.close() + } +} + +impl Stream for ReceiverStream { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_recv(cx) + } +} + +impl AsRef> for ReceiverStream { + fn as_ref(&self) -> &Receiver { + &self.inner + } +} + +impl AsMut> for ReceiverStream { + fn as_mut(&mut self) -> &mut Receiver { + &mut self.inner + } +} diff --git a/tokio-stream/src/wrappers/mpsc_unbounded.rs b/tokio-stream/src/wrappers/mpsc_unbounded.rs new file mode 100644 index 00000000000..bc5f40cdc9f --- /dev/null +++ b/tokio-stream/src/wrappers/mpsc_unbounded.rs @@ -0,0 +1,53 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::sync::mpsc::UnboundedReceiver; + +/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +pub struct UnboundedReceiverStream { + inner: UnboundedReceiver, +} + +impl UnboundedReceiverStream { + /// Create a new `UnboundedReceiverStream`. + pub fn new(recv: UnboundedReceiver) -> Self { + Self { inner: recv } + } + + /// Get back the inner `UnboundedReceiver`. + pub fn into_inner(self) -> UnboundedReceiver { + self.inner + } + + /// Closes the receiving half of a channel without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.inner.close() + } +} + +impl Stream for UnboundedReceiverStream { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_recv(cx) + } +} + +impl AsRef> for UnboundedReceiverStream { + fn as_ref(&self) -> &UnboundedReceiver { + &self.inner + } +} + +impl AsMut> for UnboundedReceiverStream { + fn as_mut(&mut self) -> &mut UnboundedReceiver { + &mut self.inner + } +} diff --git a/tokio-stream/src/wrappers/read_dir.rs b/tokio-stream/src/wrappers/read_dir.rs new file mode 100644 index 00000000000..b5cf54f79e1 --- /dev/null +++ b/tokio-stream/src/wrappers/read_dir.rs @@ -0,0 +1,47 @@ +use crate::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::fs::{DirEntry, ReadDir}; + +/// A wrapper around [`tokio::fs::ReadDir`] that implements [`Stream`]. +/// +/// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "fs")))] +pub struct ReadDirStream { + inner: ReadDir, +} + +impl ReadDirStream { + /// Create a new `ReadDirStream`. + pub fn new(read_dir: ReadDir) -> Self { + Self { inner: read_dir } + } + + /// Get back the inner `ReadDir`. + pub fn into_inner(self) -> ReadDir { + self.inner + } +} + +impl Stream for ReadDirStream { + type Item = io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_next_entry(cx).map(Result::transpose) + } +} + +impl AsRef for ReadDirStream { + fn as_ref(&self) -> &ReadDir { + &self.inner + } +} + +impl AsMut for ReadDirStream { + fn as_mut(&mut self) -> &mut ReadDir { + &mut self.inner + } +} diff --git a/tokio-stream/src/wrappers/split.rs b/tokio-stream/src/wrappers/split.rs new file mode 100644 index 00000000000..ac46a8ba6ff --- /dev/null +++ b/tokio-stream/src/wrappers/split.rs @@ -0,0 +1,59 @@ +use crate::Stream; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncBufRead, Split}; + +pin_project! { + /// A wrapper around [`tokio::io::Split`] that implements [`Stream`]. + /// + /// [`tokio::io::Split`]: struct@tokio::io::Split + /// [`Stream`]: trait@crate::Stream + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + pub struct SplitStream { + #[pin] + inner: Split, + } +} + +impl SplitStream { + /// Create a new `SplitStream`. + pub fn new(split: Split) -> Self { + Self { inner: split } + } + + /// Get back the inner `Split`. + pub fn into_inner(self) -> Split { + self.inner + } + + /// Obtain a pinned reference to the inner `Split`. + pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Split> { + self.project().inner + } +} + +impl Stream for SplitStream { + type Item = io::Result>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .inner + .poll_next_segment(cx) + .map(Result::transpose) + } +} + +impl AsRef> for SplitStream { + fn as_ref(&self) -> &Split { + &self.inner + } +} + +impl AsMut> for SplitStream { + fn as_mut(&mut self) -> &mut Split { + &mut self.inner + } +} diff --git a/tokio-stream/src/wrappers/tcp_listener.rs b/tokio-stream/src/wrappers/tcp_listener.rs new file mode 100644 index 00000000000..ce7cb163507 --- /dev/null +++ b/tokio-stream/src/wrappers/tcp_listener.rs @@ -0,0 +1,54 @@ +use crate::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::net::{TcpListener, TcpStream}; + +/// A wrapper around [`TcpListener`] that implements [`Stream`]. +/// +/// [`TcpListener`]: struct@tokio::net::TcpListener +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "net")))] +pub struct TcpListenerStream { + inner: TcpListener, +} + +impl TcpListenerStream { + /// Create a new `TcpListenerStream`. + pub fn new(listener: TcpListener) -> Self { + Self { inner: listener } + } + + /// Get back the inner `TcpListener`. + pub fn into_inner(self) -> TcpListener { + self.inner + } +} + +impl Stream for TcpListenerStream { + type Item = io::Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + match self.inner.poll_accept(cx) { + Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + } + } +} + +impl AsRef for TcpListenerStream { + fn as_ref(&self) -> &TcpListener { + &self.inner + } +} + +impl AsMut for TcpListenerStream { + fn as_mut(&mut self) -> &mut TcpListener { + &mut self.inner + } +} diff --git a/tokio-stream/src/wrappers/unix_listener.rs b/tokio-stream/src/wrappers/unix_listener.rs new file mode 100644 index 00000000000..0beba588c20 --- /dev/null +++ b/tokio-stream/src/wrappers/unix_listener.rs @@ -0,0 +1,54 @@ +use crate::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::net::{UnixListener, UnixStream}; + +/// A wrapper around [`UnixListener`] that implements [`Stream`]. +/// +/// [`UnixListener`]: struct@tokio::net::UnixListener +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "net"))))] +pub struct UnixListenerStream { + inner: UnixListener, +} + +impl UnixListenerStream { + /// Create a new `UnixListenerStream`. + pub fn new(listener: UnixListener) -> Self { + Self { inner: listener } + } + + /// Get back the inner `UnixListener`. + pub fn into_inner(self) -> UnixListener { + self.inner + } +} + +impl Stream for UnixListenerStream { + type Item = io::Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + match self.inner.poll_accept(cx) { + Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + } + } +} + +impl AsRef for UnixListenerStream { + fn as_ref(&self) -> &UnixListener { + &self.inner + } +} + +impl AsMut for UnixListenerStream { + fn as_mut(&mut self) -> &mut UnixListener { + &mut self.inner + } +} diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs index 7b21c9ccec0..aedaf7b921e 100644 --- a/tokio/src/fs/read_dir.rs +++ b/tokio/src/fs/read_dir.rs @@ -20,12 +20,15 @@ pub async fn read_dir(path: impl AsRef) -> io::Result { Ok(ReadDir(State::Idle(Some(std)))) } -/// Stream of the entries in a directory. +/// Read the the entries in a directory. /// -/// This stream is returned from the [`read_dir`] function of this module and -/// will yield instances of [`DirEntry`]. Through a [`DirEntry`] -/// information like the entry's path and possibly other metadata can be -/// learned. +/// This struct is returned from the [`read_dir`] function of this module and +/// will yield instances of [`DirEntry`]. Through a [`DirEntry`] information +/// like the entry's path and possibly other metadata can be learned. +/// +/// A `ReadDir` can be turned into a `Stream` with [`ReadDirStream`]. +/// +/// [`ReadDirStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReadDirStream.html /// /// # Errors /// diff --git a/tokio/src/io/util/lines.rs b/tokio/src/io/util/lines.rs index 25df78e99f1..ed6a944409e 100644 --- a/tokio/src/io/util/lines.rs +++ b/tokio/src/io/util/lines.rs @@ -8,7 +8,15 @@ use std::pin::Pin; use std::task::{Context, Poll}; pin_project! { - /// Stream for the [`lines`](crate::io::AsyncBufReadExt::lines) method. + /// Read lines from an [`AsyncBufRead`]. + /// + /// A `Lines` can be turned into a `Stream` with [`LinesStream`]. + /// + /// This type is usually created using the [`lines`] method. + /// + /// [`AsyncBufRead`]: crate::io::AsyncBufRead + /// [`LinesStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.LinesStream.html + /// [`lines`]: crate::io::AsyncBufReadExt::lines #[derive(Debug)] #[must_use = "streams do nothing unless polled"] #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] diff --git a/tokio/src/io/util/split.rs b/tokio/src/io/util/split.rs index eb828659e08..4f3ce4eff02 100644 --- a/tokio/src/io/util/split.rs +++ b/tokio/src/io/util/split.rs @@ -8,7 +8,11 @@ use std::pin::Pin; use std::task::{Context, Poll}; pin_project! { - /// Stream for the [`split`](crate::io::AsyncBufReadExt::split) method. + /// Splitter for the [`split`](crate::io::AsyncBufReadExt::split) method. + /// + /// A `Split` can be turned into a `Stream` with [`SplitStream`]. + /// + /// [`SplitStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.SplitStream.html #[derive(Debug)] #[must_use = "streams do nothing unless polled"] #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index a2a8637ecff..a13f1d34258 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -14,6 +14,10 @@ cfg_net! { /// You can accept a new connection by using the [`accept`](`TcpListener::accept`) /// method. /// + /// A `TcpListener` can be turned into a `Stream` with [`TcpListenerStream`]. + /// + /// [`TcpListenerStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.TcpListenerStream.html + /// /// # Errors /// /// Note that accepting a connection can lead to various errors and not all diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 9ed4ce175b0..d1c063e729f 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -14,6 +14,10 @@ cfg_net_unix! { /// /// You can accept a new connection by using the [`accept`](`UnixListener::accept`) method. /// + /// A `UnixListener` can be turned into a `Stream` with [`UnixListenerStream`]. + /// + /// [`UnixListenerStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnixListenerStream.html + /// /// # Errors /// /// Note that accepting a connection can lead to various errors and not all diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 2dae7e26fe4..985167ec565 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -33,6 +33,10 @@ pub struct Permit<'a, T> { /// Receive values from the associated `Sender`. /// /// Instances are created by the [`channel`](channel) function. +/// +/// This receiver can be turned into a `Stream` using [`ReceiverStream`]. +/// +/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html pub struct Receiver { /// The channel receiver chan: chan::Rx, diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 38953b8f978..29a0a29719e 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -33,6 +33,10 @@ impl fmt::Debug for UnboundedSender { /// /// Instances are created by the /// [`unbounded_channel`](unbounded_channel) function. +/// +/// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`]. +/// +/// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html pub struct UnboundedReceiver { /// The channel receiver chan: chan::Rx, diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index be93ba12c69..20cfceccce3 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -106,7 +106,16 @@ pub fn interval_at(start: Instant, period: Duration) -> Interval { } } -/// Stream returned by [`interval`](interval) and [`interval_at`](interval_at). +/// Interval returned by [`interval`](interval) and [`interval_at`](interval_at). +/// +/// This type allows you to wait on a sequence of instants with a certain +/// duration between each instant. Unlike calling [`sleep`](crate::time::sleep) +/// in a loop, this lets you count the time spent between the calls to `sleep` +/// as well. +/// +/// An `Interval` can be turned into a `Stream` with [`IntervalStream`]. +/// +/// [`IntervalStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.IntervalStream.html #[derive(Debug)] pub struct Interval { /// Future that completes the next time the `Interval` yields a value.