From 5686a22d42d371cafabb357718d80a8d793b9c0f Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 25 Dec 2020 15:48:54 +0100 Subject: [PATCH 1/9] chore: create stream_ext module --- tokio-stream/src/lib.rs | 918 +----------------- tokio-stream/src/stream_ext.rs | 917 +++++++++++++++++ tokio-stream/src/{ => stream_ext}/all.rs | 0 tokio-stream/src/{ => stream_ext}/any.rs | 0 tokio-stream/src/{ => stream_ext}/chain.rs | 3 +- tokio-stream/src/{ => stream_ext}/collect.rs | 0 tokio-stream/src/{ => stream_ext}/filter.rs | 0 .../src/{ => stream_ext}/filter_map.rs | 0 tokio-stream/src/{ => stream_ext}/fold.rs | 0 tokio-stream/src/{ => stream_ext}/fuse.rs | 0 tokio-stream/src/{ => stream_ext}/map.rs | 0 tokio-stream/src/{ => stream_ext}/merge.rs | 3 +- tokio-stream/src/{ => stream_ext}/next.rs | 0 tokio-stream/src/{ => stream_ext}/skip.rs | 0 .../src/{ => stream_ext}/skip_while.rs | 0 tokio-stream/src/{ => stream_ext}/take.rs | 0 .../src/{ => stream_ext}/take_while.rs | 0 tokio-stream/src/{ => stream_ext}/throttle.rs | 0 tokio-stream/src/{ => stream_ext}/timeout.rs | 3 +- tokio-stream/src/{ => stream_ext}/try_next.rs | 3 +- 20 files changed, 927 insertions(+), 920 deletions(-) create mode 100644 tokio-stream/src/stream_ext.rs rename tokio-stream/src/{ => stream_ext}/all.rs (100%) rename tokio-stream/src/{ => stream_ext}/any.rs (100%) rename tokio-stream/src/{ => stream_ext}/chain.rs (95%) rename tokio-stream/src/{ => stream_ext}/collect.rs (100%) rename tokio-stream/src/{ => stream_ext}/filter.rs (100%) rename tokio-stream/src/{ => stream_ext}/filter_map.rs (100%) rename tokio-stream/src/{ => stream_ext}/fold.rs (100%) rename tokio-stream/src/{ => stream_ext}/fuse.rs (100%) rename tokio-stream/src/{ => stream_ext}/map.rs (100%) rename tokio-stream/src/{ => stream_ext}/merge.rs (97%) rename tokio-stream/src/{ => stream_ext}/next.rs (100%) rename tokio-stream/src/{ => stream_ext}/skip.rs (100%) rename tokio-stream/src/{ => stream_ext}/skip_while.rs (100%) rename tokio-stream/src/{ => stream_ext}/take.rs (100%) rename tokio-stream/src/{ => stream_ext}/take_while.rs (100%) rename tokio-stream/src/{ => stream_ext}/throttle.rs (100%) rename tokio-stream/src/{ => stream_ext}/timeout.rs (97%) rename tokio-stream/src/{ => stream_ext}/try_next.rs (95%) diff --git a/tokio-stream/src/lib.rs b/tokio-stream/src/lib.rs index dbe865054f0..3c6d84ee469 100644 --- a/tokio-stream/src/lib.rs +++ b/tokio-stream/src/lib.rs @@ -81,46 +81,15 @@ #[macro_use] mod macros; -mod all; -use all::AllFuture; - -mod any; -use any::AnyFuture; - -mod chain; -use chain::Chain; - -mod collect; -use collect::Collect; -pub use collect::FromStream; +mod stream_ext; +pub use stream_ext::{collect::FromStream, StreamExt}; mod empty; pub use empty::{empty, Empty}; -mod filter; -use filter::Filter; - -mod filter_map; -use filter_map::FilterMap; - -mod fold; -use fold::FoldFuture; - -mod fuse; -use fuse::Fuse; - mod iter; pub use iter::{iter, Iter}; -mod map; -use map::Map; - -mod merge; -use merge::Merge; - -mod next; -use next::Next; - mod once; pub use once::{once, Once}; @@ -130,888 +99,5 @@ pub use pending::{pending, Pending}; mod stream_map; pub use stream_map::StreamMap; -mod skip; -use skip::Skip; - -mod skip_while; -use skip_while::SkipWhile; - -mod try_next; -use try_next::TryNext; - -mod take; -use take::Take; - -mod take_while; -use take_while::TakeWhile; - -cfg_time! { - mod timeout; - use timeout::Timeout; - use tokio::time::Duration; - mod throttle; - use crate::throttle::{throttle, Throttle}; -} - #[doc(no_inline)] pub use futures_core::Stream; - -/// An extension trait for the [`Stream`] trait that provides a variety of -/// convenient combinator functions. -/// -/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found -/// in the [futures] crate, however both Tokio and futures provide separate -/// `StreamExt` utility traits, and some utilities are only available on one of -/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt` -/// trait in the futures crate. -/// -/// If you need utilities from both `StreamExt` traits, you should prefer to -/// import one of them, and use the other through the fully qualified call -/// syntax. For example: -/// ``` -/// // import one of the traits: -/// use futures::stream::StreamExt; -/// # #[tokio::main(flavor = "current_thread")] -/// # async fn main() { -/// -/// let a = tokio_stream::iter(vec![1, 3, 5]); -/// let b = tokio_stream::iter(vec![2, 4, 6]); -/// -/// // use the fully qualified call syntax for the other trait: -/// let merged = tokio_stream::StreamExt::merge(a, b); -/// -/// // use normal call notation for futures::stream::StreamExt::collect -/// let output: Vec<_> = merged.collect().await; -/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]); -/// # } -/// ``` -/// -/// [`Stream`]: crate::Stream -/// [futures]: https://docs.rs/futures -/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html -pub trait StreamExt: Stream { - /// Consumes and returns the next value in the stream or `None` if the - /// stream is finished. - /// - /// Equivalent to: - /// - /// ```ignore - /// async fn next(&mut self) -> Option; - /// ``` - /// - /// Note that because `next` doesn't take ownership over the stream, - /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a - /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can - /// be done by boxing the stream using [`Box::pin`] or - /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` - /// crate. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let mut stream = stream::iter(1..=3); - /// - /// assert_eq!(stream.next().await, Some(1)); - /// assert_eq!(stream.next().await, Some(2)); - /// assert_eq!(stream.next().await, Some(3)); - /// assert_eq!(stream.next().await, None); - /// # } - /// ``` - fn next(&mut self) -> Next<'_, Self> - where - Self: Unpin, - { - Next::new(self) - } - - /// Consumes and returns the next item in the stream. If an error is - /// encountered before the next item, the error is returned instead. - /// - /// Equivalent to: - /// - /// ```ignore - /// async fn try_next(&mut self) -> Result, E>; - /// ``` - /// - /// This is similar to the [`next`](StreamExt::next) combinator, - /// but returns a [`Result, E>`](Result) rather than - /// an [`Option>`](Option), making for easy use - /// with the [`?`](std::ops::Try) operator. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]); - /// - /// assert_eq!(stream.try_next().await, Ok(Some(1))); - /// assert_eq!(stream.try_next().await, Ok(Some(2))); - /// assert_eq!(stream.try_next().await, Err("nope")); - /// # } - /// ``` - fn try_next(&mut self) -> TryNext<'_, Self> - where - Self: Stream> + Unpin, - { - TryNext::new(self) - } - - /// Maps this stream's items to a different type, returning a new stream of - /// the resulting type. - /// - /// The provided closure is executed over all elements of this stream as - /// they are made available. It is executed inline with calls to - /// [`poll_next`](Stream::poll_next). - /// - /// Note that this function consumes the stream passed into it and returns a - /// wrapped version of it, similar to the existing `map` methods in the - /// standard library. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let stream = stream::iter(1..=3); - /// let mut stream = stream.map(|x| x + 3); - /// - /// assert_eq!(stream.next().await, Some(4)); - /// assert_eq!(stream.next().await, Some(5)); - /// assert_eq!(stream.next().await, Some(6)); - /// # } - /// ``` - fn map(self, f: F) -> Map - where - F: FnMut(Self::Item) -> T, - Self: Sized, - { - Map::new(self, f) - } - - /// Combine two streams into one by interleaving the output of both as it - /// is produced. - /// - /// Values are produced from the merged stream in the order they arrive from - /// the two source streams. If both source streams provide values - /// simultaneously, the merge stream alternates between them. This provides - /// some level of fairness. You should not chain calls to `merge`, as this - /// will break the fairness of the merging. - /// - /// The merged stream completes once **both** source streams complete. When - /// one source stream completes before the other, the merge stream - /// exclusively polls the remaining stream. - /// - /// For merging multiple streams, consider using [`StreamMap`] instead. - /// - /// [`StreamMap`]: crate::StreamMap - /// - /// # Examples - /// - /// ``` - /// use tokio_stream::{StreamExt, Stream}; - /// use tokio::sync::mpsc; - /// use tokio::time; - /// - /// use std::time::Duration; - /// use std::pin::Pin; - /// - /// # /* - /// #[tokio::main] - /// # */ - /// # #[tokio::main(flavor = "current_thread")] - /// async fn main() { - /// # time::pause(); - /// let (tx1, mut rx1) = mpsc::channel::(10); - /// let (tx2, mut rx2) = mpsc::channel::(10); - /// - /// // Convert the channels to a `Stream`. - /// let rx1 = Box::pin(async_stream::stream! { - /// while let Some(item) = rx1.recv().await { - /// yield item; - /// } - /// }) as Pin + Send>>; - /// - /// let rx2 = Box::pin(async_stream::stream! { - /// while let Some(item) = rx2.recv().await { - /// yield item; - /// } - /// }) as Pin + Send>>; - /// - /// let mut rx = rx1.merge(rx2); - /// - /// tokio::spawn(async move { - /// // Send some values immediately - /// tx1.send(1).await.unwrap(); - /// tx1.send(2).await.unwrap(); - /// - /// // Let the other task send values - /// time::sleep(Duration::from_millis(20)).await; - /// - /// tx1.send(4).await.unwrap(); - /// }); - /// - /// tokio::spawn(async move { - /// // Wait for the first task to send values - /// time::sleep(Duration::from_millis(5)).await; - /// - /// tx2.send(3).await.unwrap(); - /// - /// time::sleep(Duration::from_millis(25)).await; - /// - /// // Send the final value - /// tx2.send(5).await.unwrap(); - /// }); - /// - /// assert_eq!(1, rx.next().await.unwrap()); - /// assert_eq!(2, rx.next().await.unwrap()); - /// assert_eq!(3, rx.next().await.unwrap()); - /// assert_eq!(4, rx.next().await.unwrap()); - /// assert_eq!(5, rx.next().await.unwrap()); - /// - /// // The merged stream is consumed - /// assert!(rx.next().await.is_none()); - /// } - /// ``` - fn merge(self, other: U) -> Merge - where - U: Stream, - Self: Sized, - { - Merge::new(self, other) - } - - /// Filters the values produced by this stream according to the provided - /// predicate. - /// - /// As values of this stream are made available, the provided predicate `f` - /// will be run against them. If the predicate - /// resolves to `true`, then the stream will yield the value, but if the - /// predicate resolves to `false`, then the value - /// will be discarded and the next value will be produced. - /// - /// Note that this function consumes the stream passed into it and returns a - /// wrapped version of it, similar to [`Iterator::filter`] method in the - /// standard library. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let stream = stream::iter(1..=8); - /// let mut evens = stream.filter(|x| x % 2 == 0); - /// - /// assert_eq!(Some(2), evens.next().await); - /// assert_eq!(Some(4), evens.next().await); - /// assert_eq!(Some(6), evens.next().await); - /// assert_eq!(Some(8), evens.next().await); - /// assert_eq!(None, evens.next().await); - /// # } - /// ``` - fn filter(self, f: F) -> Filter - where - F: FnMut(&Self::Item) -> bool, - Self: Sized, - { - Filter::new(self, f) - } - - /// Filters the values produced by this stream while simultaneously mapping - /// them to a different type according to the provided closure. - /// - /// As values of this stream are made available, the provided function will - /// be run on them. If the predicate `f` resolves to - /// [`Some(item)`](Some) then the stream will yield the value `item`, but if - /// it resolves to [`None`], then the value will be skipped. - /// - /// Note that this function consumes the stream passed into it and returns a - /// wrapped version of it, similar to [`Iterator::filter_map`] method in the - /// standard library. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let stream = stream::iter(1..=8); - /// let mut evens = stream.filter_map(|x| { - /// if x % 2 == 0 { Some(x + 1) } else { None } - /// }); - /// - /// assert_eq!(Some(3), evens.next().await); - /// assert_eq!(Some(5), evens.next().await); - /// assert_eq!(Some(7), evens.next().await); - /// assert_eq!(Some(9), evens.next().await); - /// assert_eq!(None, evens.next().await); - /// # } - /// ``` - fn filter_map(self, f: F) -> FilterMap - where - F: FnMut(Self::Item) -> Option, - Self: Sized, - { - FilterMap::new(self, f) - } - - /// Creates a stream which ends after the first `None`. - /// - /// After a stream returns `None`, behavior is undefined. Future calls to - /// `poll_next` may or may not return `Some(T)` again or they may panic. - /// `fuse()` adapts a stream, ensuring that after `None` is given, it will - /// return `None` forever. - /// - /// # Examples - /// - /// ``` - /// use tokio_stream::{Stream, StreamExt}; - /// - /// use std::pin::Pin; - /// use std::task::{Context, Poll}; - /// - /// // a stream which alternates between Some and None - /// struct Alternate { - /// state: i32, - /// } - /// - /// impl Stream for Alternate { - /// type Item = i32; - /// - /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - /// let val = self.state; - /// self.state = self.state + 1; - /// - /// // if it's even, Some(i32), else None - /// if val % 2 == 0 { - /// Poll::Ready(Some(val)) - /// } else { - /// Poll::Ready(None) - /// } - /// } - /// } - /// - /// #[tokio::main] - /// async fn main() { - /// let mut stream = Alternate { state: 0 }; - /// - /// // the stream goes back and forth - /// assert_eq!(stream.next().await, Some(0)); - /// assert_eq!(stream.next().await, None); - /// assert_eq!(stream.next().await, Some(2)); - /// assert_eq!(stream.next().await, None); - /// - /// // however, once it is fused - /// let mut stream = stream.fuse(); - /// - /// assert_eq!(stream.next().await, Some(4)); - /// assert_eq!(stream.next().await, None); - /// - /// // it will always return `None` after the first time. - /// assert_eq!(stream.next().await, None); - /// assert_eq!(stream.next().await, None); - /// assert_eq!(stream.next().await, None); - /// } - /// ``` - fn fuse(self) -> Fuse - where - Self: Sized, - { - Fuse::new(self) - } - - /// Creates a new stream of at most `n` items of the underlying stream. - /// - /// Once `n` items have been yielded from this stream then it will always - /// return that the stream is done. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let mut stream = stream::iter(1..=10).take(3); - /// - /// assert_eq!(Some(1), stream.next().await); - /// assert_eq!(Some(2), stream.next().await); - /// assert_eq!(Some(3), stream.next().await); - /// assert_eq!(None, stream.next().await); - /// # } - /// ``` - fn take(self, n: usize) -> Take - where - Self: Sized, - { - Take::new(self, n) - } - - /// Take elements from this stream while the provided predicate - /// resolves to `true`. - /// - /// This function, like `Iterator::take_while`, will take elements from the - /// stream until the predicate `f` resolves to `false`. Once one element - /// returns false it will always return that the stream is done. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3); - /// - /// assert_eq!(Some(1), stream.next().await); - /// assert_eq!(Some(2), stream.next().await); - /// assert_eq!(Some(3), stream.next().await); - /// assert_eq!(None, stream.next().await); - /// # } - /// ``` - fn take_while(self, f: F) -> TakeWhile - where - F: FnMut(&Self::Item) -> bool, - Self: Sized, - { - TakeWhile::new(self, f) - } - - /// Creates a new stream that will skip the `n` first items of the - /// underlying stream. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let mut stream = stream::iter(1..=10).skip(7); - /// - /// assert_eq!(Some(8), stream.next().await); - /// assert_eq!(Some(9), stream.next().await); - /// assert_eq!(Some(10), stream.next().await); - /// assert_eq!(None, stream.next().await); - /// # } - /// ``` - fn skip(self, n: usize) -> Skip - where - Self: Sized, - { - Skip::new(self, n) - } - - /// Skip elements from the underlying stream while the provided predicate - /// resolves to `true`. - /// - /// This function, like [`Iterator::skip_while`], will ignore elemets from the - /// stream until the predicate `f` resolves to `false`. Once one element - /// returns false, the rest of the elements will be yielded. - /// - /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while() - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3); - /// - /// assert_eq!(Some(3), stream.next().await); - /// assert_eq!(Some(4), stream.next().await); - /// assert_eq!(Some(1), stream.next().await); - /// assert_eq!(None, stream.next().await); - /// # } - /// ``` - fn skip_while(self, f: F) -> SkipWhile - where - F: FnMut(&Self::Item) -> bool, - Self: Sized, - { - SkipWhile::new(self, f) - } - - /// Tests if every element of the stream matches a predicate. - /// - /// Equivalent to: - /// - /// ```ignore - /// async fn all(&mut self, f: F) -> bool; - /// ``` - /// - /// `all()` takes a closure that returns `true` or `false`. It applies - /// this closure to each element of the stream, and if they all return - /// `true`, then so does `all`. If any of them return `false`, it - /// returns `false`. An empty stream returns `true`. - /// - /// `all()` is short-circuiting; in other words, it will stop processing - /// as soon as it finds a `false`, given that no matter what else happens, - /// the result will also be `false`. - /// - /// An empty stream returns `true`. - /// - /// # Examples - /// - /// Basic usage: - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let a = [1, 2, 3]; - /// - /// assert!(stream::iter(&a).all(|&x| x > 0).await); - /// - /// assert!(!stream::iter(&a).all(|&x| x > 2).await); - /// # } - /// ``` - /// - /// Stopping at the first `false`: - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let a = [1, 2, 3]; - /// - /// let mut iter = stream::iter(&a); - /// - /// assert!(!iter.all(|&x| x != 2).await); - /// - /// // we can still use `iter`, as there are more elements. - /// assert_eq!(iter.next().await, Some(&3)); - /// # } - /// ``` - fn all(&mut self, f: F) -> AllFuture<'_, Self, F> - where - Self: Unpin, - F: FnMut(Self::Item) -> bool, - { - AllFuture::new(self, f) - } - - /// Tests if any element of the stream matches a predicate. - /// - /// Equivalent to: - /// - /// ```ignore - /// async fn any(&mut self, f: F) -> bool; - /// ``` - /// - /// `any()` takes a closure that returns `true` or `false`. It applies - /// this closure to each element of the stream, and if any of them return - /// `true`, then so does `any()`. If they all return `false`, it - /// returns `false`. - /// - /// `any()` is short-circuiting; in other words, it will stop processing - /// as soon as it finds a `true`, given that no matter what else happens, - /// the result will also be `true`. - /// - /// An empty stream returns `false`. - /// - /// Basic usage: - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let a = [1, 2, 3]; - /// - /// assert!(stream::iter(&a).any(|&x| x > 0).await); - /// - /// assert!(!stream::iter(&a).any(|&x| x > 5).await); - /// # } - /// ``` - /// - /// Stopping at the first `true`: - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let a = [1, 2, 3]; - /// - /// let mut iter = stream::iter(&a); - /// - /// assert!(iter.any(|&x| x != 2).await); - /// - /// // we can still use `iter`, as there are more elements. - /// assert_eq!(iter.next().await, Some(&2)); - /// # } - /// ``` - fn any(&mut self, f: F) -> AnyFuture<'_, Self, F> - where - Self: Unpin, - F: FnMut(Self::Item) -> bool, - { - AnyFuture::new(self, f) - } - - /// Combine two streams into one by first returning all values from the - /// first stream then all values from the second stream. - /// - /// As long as `self` still has values to emit, no values from `other` are - /// emitted, even if some are ready. - /// - /// # Examples - /// - /// ``` - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// #[tokio::main] - /// async fn main() { - /// let one = stream::iter(vec![1, 2, 3]); - /// let two = stream::iter(vec![4, 5, 6]); - /// - /// let mut stream = one.chain(two); - /// - /// assert_eq!(stream.next().await, Some(1)); - /// assert_eq!(stream.next().await, Some(2)); - /// assert_eq!(stream.next().await, Some(3)); - /// assert_eq!(stream.next().await, Some(4)); - /// assert_eq!(stream.next().await, Some(5)); - /// assert_eq!(stream.next().await, Some(6)); - /// assert_eq!(stream.next().await, None); - /// } - /// ``` - fn chain(self, other: U) -> Chain - where - U: Stream, - Self: Sized, - { - Chain::new(self, other) - } - - /// A combinator that applies a function to every element in a stream - /// producing a single, final value. - /// - /// Equivalent to: - /// - /// ```ignore - /// async fn fold(self, init: B, f: F) -> B; - /// ``` - /// - /// # Examples - /// Basic usage: - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, *}; - /// - /// let s = stream::iter(vec![1u8, 2, 3]); - /// let sum = s.fold(0, |acc, x| acc + x).await; - /// - /// assert_eq!(sum, 6); - /// # } - /// ``` - fn fold(self, init: B, f: F) -> FoldFuture - where - Self: Sized, - F: FnMut(B, Self::Item) -> B, - { - FoldFuture::new(self, init, f) - } - - /// Drain stream pushing all emitted values into a collection. - /// - /// Equivalent to: - /// - /// ```ignore - /// async fn collect(self) -> T; - /// ``` - /// - /// `collect` streams all values, awaiting as needed. Values are pushed into - /// a collection. A number of different target collection types are - /// supported, including [`Vec`](std::vec::Vec), - /// [`String`](std::string::String), and [`Bytes`]. - /// - /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html - /// - /// # `Result` - /// - /// `collect()` can also be used with streams of type `Result` where - /// `T: FromStream<_>`. In this case, `collect()` will stream as long as - /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered, - /// streaming is terminated and `collect()` returns the `Err`. - /// - /// # Notes - /// - /// `FromStream` is currently a sealed trait. Stabilization is pending - /// enhancements to the Rust language. - /// - /// # Examples - /// - /// Basic usage: - /// - /// ``` - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// #[tokio::main] - /// async fn main() { - /// let doubled: Vec = - /// stream::iter(vec![1, 2, 3]) - /// .map(|x| x * 2) - /// .collect() - /// .await; - /// - /// assert_eq!(vec![2, 4, 6], doubled); - /// } - /// ``` - /// - /// Collecting a stream of `Result` values - /// - /// ``` - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// #[tokio::main] - /// async fn main() { - /// // A stream containing only `Ok` values will be collected - /// let values: Result, &str> = - /// stream::iter(vec![Ok(1), Ok(2), Ok(3)]) - /// .collect() - /// .await; - /// - /// assert_eq!(Ok(vec![1, 2, 3]), values); - /// - /// // A stream containing `Err` values will return the first error. - /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")]; - /// - /// let values: Result, &str> = - /// stream::iter(results) - /// .collect() - /// .await; - /// - /// assert_eq!(Err("no"), values); - /// } - /// ``` - fn collect(self) -> Collect - where - T: FromStream, - Self: Sized, - { - Collect::new(self) - } - - /// Applies a per-item timeout to the passed stream. - /// - /// `timeout()` takes a `Duration` that represents the maximum amount of - /// time each element of the stream has to complete before timing out. - /// - /// If the wrapped stream yields a value before the deadline is reached, the - /// value is returned. Otherwise, an error is returned. The caller may decide - /// to continue consuming the stream and will eventually get the next source - /// stream value once it becomes available. - /// - /// # Notes - /// - /// This function consumes the stream passed into it and returns a - /// wrapped version of it. - /// - /// Polling the returned stream will continue to poll the inner stream even - /// if one or more items time out. - /// - /// # Examples - /// - /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// use std::time::Duration; - /// # let int_stream = stream::iter(1..=3); - /// - /// let int_stream = int_stream.timeout(Duration::from_secs(1)); - /// tokio::pin!(int_stream); - /// - /// // When no items time out, we get the 3 elements in succession: - /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); - /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); - /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); - /// assert_eq!(int_stream.try_next().await, Ok(None)); - /// - /// // If the second item times out, we get an error and continue polling the stream: - /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); - /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); - /// assert!(int_stream.try_next().await.is_err()); - /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); - /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); - /// assert_eq!(int_stream.try_next().await, Ok(None)); - /// - /// // If we want to stop consuming the source stream the first time an - /// // element times out, we can use the `take_while` operator: - /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); - /// let mut int_stream = int_stream.take_while(Result::is_ok); - /// - /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); - /// assert_eq!(int_stream.try_next().await, Ok(None)); - /// # } - /// ``` - #[cfg(all(feature = "time"))] - #[cfg_attr(docsrs, doc(cfg(feature = "time")))] - fn timeout(self, duration: Duration) -> Timeout - where - Self: Sized, - { - Timeout::new(self, duration) - } - - /// Slows down a stream by enforcing a delay between items. - /// - /// # Example - /// - /// Create a throttled stream. - /// ```rust,no_run - /// use std::time::Duration; - /// use tokio_stream::StreamExt; - /// - /// # async fn dox() { - /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2)); - /// tokio::pin!(item_stream); - /// - /// loop { - /// // The string will be produced at most every 2 seconds - /// println!("{:?}", item_stream.next().await); - /// } - /// # } - /// ``` - #[cfg(all(feature = "time"))] - #[cfg_attr(docsrs, doc(cfg(feature = "time")))] - fn throttle(self, duration: Duration) -> Throttle - where - Self: Sized, - { - throttle(duration, self) - } -} - -impl StreamExt for St where St: Stream {} - -/// Merge the size hints from two streams. -fn merge_size_hints( - (left_low, left_high): (usize, Option), - (right_low, right_hign): (usize, Option), -) -> (usize, Option) { - let low = left_low.saturating_add(right_low); - let high = match (left_high, right_hign) { - (Some(h1), Some(h2)) => h1.checked_add(h2), - _ => None, - }; - (low, high) -} diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs new file mode 100644 index 00000000000..51532ee1cb6 --- /dev/null +++ b/tokio-stream/src/stream_ext.rs @@ -0,0 +1,917 @@ +use futures_core::Stream; + +mod all; +use all::AllFuture; + +mod any; +use any::AnyFuture; + +mod chain; +use chain::Chain; + +pub(crate) mod collect; +use collect::{Collect, FromStream}; + +mod filter; +use filter::Filter; + +mod filter_map; +use filter_map::FilterMap; + +mod fold; +use fold::FoldFuture; + +mod fuse; +use fuse::Fuse; + +mod map; +use map::Map; + +mod merge; +use merge::Merge; + +mod next; +use next::Next; + +mod skip; +use skip::Skip; + +mod skip_while; +use skip_while::SkipWhile; + +mod try_next; +use try_next::TryNext; + +mod take; +use take::Take; + +mod take_while; +use take_while::TakeWhile; + +cfg_time! { + mod timeout; + use timeout::Timeout; + use tokio::time::Duration; + mod throttle; + use throttle::{throttle, Throttle}; +} + +/// An extension trait for the [`Stream`] trait that provides a variety of +/// convenient combinator functions. +/// +/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found +/// in the [futures] crate, however both Tokio and futures provide separate +/// `StreamExt` utility traits, and some utilities are only available on one of +/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt` +/// trait in the futures crate. +/// +/// If you need utilities from both `StreamExt` traits, you should prefer to +/// import one of them, and use the other through the fully qualified call +/// syntax. For example: +/// ``` +/// // import one of the traits: +/// use futures::stream::StreamExt; +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() { +/// +/// let a = tokio_stream::iter(vec![1, 3, 5]); +/// let b = tokio_stream::iter(vec![2, 4, 6]); +/// +/// // use the fully qualified call syntax for the other trait: +/// let merged = tokio_stream::StreamExt::merge(a, b); +/// +/// // use normal call notation for futures::stream::StreamExt::collect +/// let output: Vec<_> = merged.collect().await; +/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]); +/// # } +/// ``` +/// +/// [`Stream`]: crate::Stream +/// [futures]: https://docs.rs/futures +/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html +pub trait StreamExt: Stream { + /// Consumes and returns the next value in the stream or `None` if the + /// stream is finished. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn next(&mut self) -> Option; + /// ``` + /// + /// Note that because `next` doesn't take ownership over the stream, + /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a + /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can + /// be done by boxing the stream using [`Box::pin`] or + /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` + /// crate. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=3); + /// + /// assert_eq!(stream.next().await, Some(1)); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, Some(3)); + /// assert_eq!(stream.next().await, None); + /// # } + /// ``` + fn next(&mut self) -> Next<'_, Self> + where + Self: Unpin, + { + Next::new(self) + } + + /// Consumes and returns the next item in the stream. If an error is + /// encountered before the next item, the error is returned instead. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn try_next(&mut self) -> Result, E>; + /// ``` + /// + /// This is similar to the [`next`](StreamExt::next) combinator, + /// but returns a [`Result, E>`](Result) rather than + /// an [`Option>`](Option), making for easy use + /// with the [`?`](std::ops::Try) operator. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]); + /// + /// assert_eq!(stream.try_next().await, Ok(Some(1))); + /// assert_eq!(stream.try_next().await, Ok(Some(2))); + /// assert_eq!(stream.try_next().await, Err("nope")); + /// # } + /// ``` + fn try_next(&mut self) -> TryNext<'_, Self> + where + Self: Stream> + Unpin, + { + TryNext::new(self) + } + + /// Maps this stream's items to a different type, returning a new stream of + /// the resulting type. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available. It is executed inline with calls to + /// [`poll_next`](Stream::poll_next). + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=3); + /// let mut stream = stream.map(|x| x + 3); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// # } + /// ``` + fn map(self, f: F) -> Map + where + F: FnMut(Self::Item) -> T, + Self: Sized, + { + Map::new(self, f) + } + + /// Combine two streams into one by interleaving the output of both as it + /// is produced. + /// + /// Values are produced from the merged stream in the order they arrive from + /// the two source streams. If both source streams provide values + /// simultaneously, the merge stream alternates between them. This provides + /// some level of fairness. You should not chain calls to `merge`, as this + /// will break the fairness of the merging. + /// + /// The merged stream completes once **both** source streams complete. When + /// one source stream completes before the other, the merge stream + /// exclusively polls the remaining stream. + /// + /// For merging multiple streams, consider using [`StreamMap`] instead. + /// + /// [`StreamMap`]: crate::StreamMap + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamExt, Stream}; + /// use tokio::sync::mpsc; + /// use tokio::time; + /// + /// use std::time::Duration; + /// use std::pin::Pin; + /// + /// # /* + /// #[tokio::main] + /// # */ + /// # #[tokio::main(flavor = "current_thread")] + /// async fn main() { + /// # time::pause(); + /// let (tx1, mut rx1) = mpsc::channel::(10); + /// let (tx2, mut rx2) = mpsc::channel::(10); + /// + /// // Convert the channels to a `Stream`. + /// let rx1 = Box::pin(async_stream::stream! { + /// while let Some(item) = rx1.recv().await { + /// yield item; + /// } + /// }) as Pin + Send>>; + /// + /// let rx2 = Box::pin(async_stream::stream! { + /// while let Some(item) = rx2.recv().await { + /// yield item; + /// } + /// }) as Pin + Send>>; + /// + /// let mut rx = rx1.merge(rx2); + /// + /// tokio::spawn(async move { + /// // Send some values immediately + /// tx1.send(1).await.unwrap(); + /// tx1.send(2).await.unwrap(); + /// + /// // Let the other task send values + /// time::sleep(Duration::from_millis(20)).await; + /// + /// tx1.send(4).await.unwrap(); + /// }); + /// + /// tokio::spawn(async move { + /// // Wait for the first task to send values + /// time::sleep(Duration::from_millis(5)).await; + /// + /// tx2.send(3).await.unwrap(); + /// + /// time::sleep(Duration::from_millis(25)).await; + /// + /// // Send the final value + /// tx2.send(5).await.unwrap(); + /// }); + /// + /// assert_eq!(1, rx.next().await.unwrap()); + /// assert_eq!(2, rx.next().await.unwrap()); + /// assert_eq!(3, rx.next().await.unwrap()); + /// assert_eq!(4, rx.next().await.unwrap()); + /// assert_eq!(5, rx.next().await.unwrap()); + /// + /// // The merged stream is consumed + /// assert!(rx.next().await.is_none()); + /// } + /// ``` + fn merge(self, other: U) -> Merge + where + U: Stream, + Self: Sized, + { + Merge::new(self, other) + } + + /// Filters the values produced by this stream according to the provided + /// predicate. + /// + /// As values of this stream are made available, the provided predicate `f` + /// will be run against them. If the predicate + /// resolves to `true`, then the stream will yield the value, but if the + /// predicate resolves to `false`, then the value + /// will be discarded and the next value will be produced. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to [`Iterator::filter`] method in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=8); + /// let mut evens = stream.filter(|x| x % 2 == 0); + /// + /// assert_eq!(Some(2), evens.next().await); + /// assert_eq!(Some(4), evens.next().await); + /// assert_eq!(Some(6), evens.next().await); + /// assert_eq!(Some(8), evens.next().await); + /// assert_eq!(None, evens.next().await); + /// # } + /// ``` + fn filter(self, f: F) -> Filter + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + Filter::new(self, f) + } + + /// Filters the values produced by this stream while simultaneously mapping + /// them to a different type according to the provided closure. + /// + /// As values of this stream are made available, the provided function will + /// be run on them. If the predicate `f` resolves to + /// [`Some(item)`](Some) then the stream will yield the value `item`, but if + /// it resolves to [`None`], then the value will be skipped. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to [`Iterator::filter_map`] method in the + /// standard library. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=8); + /// let mut evens = stream.filter_map(|x| { + /// if x % 2 == 0 { Some(x + 1) } else { None } + /// }); + /// + /// assert_eq!(Some(3), evens.next().await); + /// assert_eq!(Some(5), evens.next().await); + /// assert_eq!(Some(7), evens.next().await); + /// assert_eq!(Some(9), evens.next().await); + /// assert_eq!(None, evens.next().await); + /// # } + /// ``` + fn filter_map(self, f: F) -> FilterMap + where + F: FnMut(Self::Item) -> Option, + Self: Sized, + { + FilterMap::new(self, f) + } + + /// Creates a stream which ends after the first `None`. + /// + /// After a stream returns `None`, behavior is undefined. Future calls to + /// `poll_next` may or may not return `Some(T)` again or they may panic. + /// `fuse()` adapts a stream, ensuring that after `None` is given, it will + /// return `None` forever. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{Stream, StreamExt}; + /// + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// + /// // a stream which alternates between Some and None + /// struct Alternate { + /// state: i32, + /// } + /// + /// impl Stream for Alternate { + /// type Item = i32; + /// + /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + /// let val = self.state; + /// self.state = self.state + 1; + /// + /// // if it's even, Some(i32), else None + /// if val % 2 == 0 { + /// Poll::Ready(Some(val)) + /// } else { + /// Poll::Ready(None) + /// } + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// let mut stream = Alternate { state: 0 }; + /// + /// // the stream goes back and forth + /// assert_eq!(stream.next().await, Some(0)); + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, None); + /// + /// // however, once it is fused + /// let mut stream = stream.fuse(); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, None); + /// + /// // it will always return `None` after the first time. + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, None); + /// } + /// ``` + fn fuse(self) -> Fuse + where + Self: Sized, + { + Fuse::new(self) + } + + /// Creates a new stream of at most `n` items of the underlying stream. + /// + /// Once `n` items have been yielded from this stream then it will always + /// return that the stream is done. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).take(3); + /// + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(Some(2), stream.next().await); + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn take(self, n: usize) -> Take + where + Self: Sized, + { + Take::new(self, n) + } + + /// Take elements from this stream while the provided predicate + /// resolves to `true`. + /// + /// This function, like `Iterator::take_while`, will take elements from the + /// stream until the predicate `f` resolves to `false`. Once one element + /// returns false it will always return that the stream is done. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3); + /// + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(Some(2), stream.next().await); + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn take_while(self, f: F) -> TakeWhile + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + TakeWhile::new(self, f) + } + + /// Creates a new stream that will skip the `n` first items of the + /// underlying stream. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).skip(7); + /// + /// assert_eq!(Some(8), stream.next().await); + /// assert_eq!(Some(9), stream.next().await); + /// assert_eq!(Some(10), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn skip(self, n: usize) -> Skip + where + Self: Sized, + { + Skip::new(self, n) + } + + /// Skip elements from the underlying stream while the provided predicate + /// resolves to `true`. + /// + /// This function, like [`Iterator::skip_while`], will ignore elemets from the + /// stream until the predicate `f` resolves to `false`. Once one element + /// returns false, the rest of the elements will be yielded. + /// + /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while() + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3); + /// + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(Some(4), stream.next().await); + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn skip_while(self, f: F) -> SkipWhile + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + SkipWhile::new(self, f) + } + + /// Tests if every element of the stream matches a predicate. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn all(&mut self, f: F) -> bool; + /// ``` + /// + /// `all()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if they all return + /// `true`, then so does `all`. If any of them return `false`, it + /// returns `false`. An empty stream returns `true`. + /// + /// `all()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `false`, given that no matter what else happens, + /// the result will also be `false`. + /// + /// An empty stream returns `true`. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// assert!(stream::iter(&a).all(|&x| x > 0).await); + /// + /// assert!(!stream::iter(&a).all(|&x| x > 2).await); + /// # } + /// ``` + /// + /// Stopping at the first `false`: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// let mut iter = stream::iter(&a); + /// + /// assert!(!iter.all(|&x| x != 2).await); + /// + /// // we can still use `iter`, as there are more elements. + /// assert_eq!(iter.next().await, Some(&3)); + /// # } + /// ``` + fn all(&mut self, f: F) -> AllFuture<'_, Self, F> + where + Self: Unpin, + F: FnMut(Self::Item) -> bool, + { + AllFuture::new(self, f) + } + + /// Tests if any element of the stream matches a predicate. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn any(&mut self, f: F) -> bool; + /// ``` + /// + /// `any()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if any of them return + /// `true`, then so does `any()`. If they all return `false`, it + /// returns `false`. + /// + /// `any()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `true`, given that no matter what else happens, + /// the result will also be `true`. + /// + /// An empty stream returns `false`. + /// + /// Basic usage: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// assert!(stream::iter(&a).any(|&x| x > 0).await); + /// + /// assert!(!stream::iter(&a).any(|&x| x > 5).await); + /// # } + /// ``` + /// + /// Stopping at the first `true`: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// let mut iter = stream::iter(&a); + /// + /// assert!(iter.any(|&x| x != 2).await); + /// + /// // we can still use `iter`, as there are more elements. + /// assert_eq!(iter.next().await, Some(&2)); + /// # } + /// ``` + fn any(&mut self, f: F) -> AnyFuture<'_, Self, F> + where + Self: Unpin, + F: FnMut(Self::Item) -> bool, + { + AnyFuture::new(self, f) + } + + /// Combine two streams into one by first returning all values from the + /// first stream then all values from the second stream. + /// + /// As long as `self` still has values to emit, no values from `other` are + /// emitted, even if some are ready. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// let one = stream::iter(vec![1, 2, 3]); + /// let two = stream::iter(vec![4, 5, 6]); + /// + /// let mut stream = one.chain(two); + /// + /// assert_eq!(stream.next().await, Some(1)); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, Some(3)); + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// assert_eq!(stream.next().await, None); + /// } + /// ``` + fn chain(self, other: U) -> Chain + where + U: Stream, + Self: Sized, + { + Chain::new(self, other) + } + + /// A combinator that applies a function to every element in a stream + /// producing a single, final value. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn fold(self, init: B, f: F) -> B; + /// ``` + /// + /// # Examples + /// Basic usage: + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, *}; + /// + /// let s = stream::iter(vec![1u8, 2, 3]); + /// let sum = s.fold(0, |acc, x| acc + x).await; + /// + /// assert_eq!(sum, 6); + /// # } + /// ``` + fn fold(self, init: B, f: F) -> FoldFuture + where + Self: Sized, + F: FnMut(B, Self::Item) -> B, + { + FoldFuture::new(self, init, f) + } + + /// Drain stream pushing all emitted values into a collection. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn collect(self) -> T; + /// ``` + /// + /// `collect` streams all values, awaiting as needed. Values are pushed into + /// a collection. A number of different target collection types are + /// supported, including [`Vec`](std::vec::Vec), + /// [`String`](std::string::String), and [`Bytes`]. + /// + /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html + /// + /// # `Result` + /// + /// `collect()` can also be used with streams of type `Result` where + /// `T: FromStream<_>`. In this case, `collect()` will stream as long as + /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered, + /// streaming is terminated and `collect()` returns the `Err`. + /// + /// # Notes + /// + /// `FromStream` is currently a sealed trait. Stabilization is pending + /// enhancements to the Rust language. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// let doubled: Vec = + /// stream::iter(vec![1, 2, 3]) + /// .map(|x| x * 2) + /// .collect() + /// .await; + /// + /// assert_eq!(vec![2, 4, 6], doubled); + /// } + /// ``` + /// + /// Collecting a stream of `Result` values + /// + /// ``` + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// // A stream containing only `Ok` values will be collected + /// let values: Result, &str> = + /// stream::iter(vec![Ok(1), Ok(2), Ok(3)]) + /// .collect() + /// .await; + /// + /// assert_eq!(Ok(vec![1, 2, 3]), values); + /// + /// // A stream containing `Err` values will return the first error. + /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")]; + /// + /// let values: Result, &str> = + /// stream::iter(results) + /// .collect() + /// .await; + /// + /// assert_eq!(Err("no"), values); + /// } + /// ``` + fn collect(self) -> Collect + where + T: FromStream, + Self: Sized, + { + Collect::new(self) + } + + /// Applies a per-item timeout to the passed stream. + /// + /// `timeout()` takes a `Duration` that represents the maximum amount of + /// time each element of the stream has to complete before timing out. + /// + /// If the wrapped stream yields a value before the deadline is reached, the + /// value is returned. Otherwise, an error is returned. The caller may decide + /// to continue consuming the stream and will eventually get the next source + /// stream value once it becomes available. + /// + /// # Notes + /// + /// This function consumes the stream passed into it and returns a + /// wrapped version of it. + /// + /// Polling the returned stream will continue to poll the inner stream even + /// if one or more items time out. + /// + /// # Examples + /// + /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// use std::time::Duration; + /// # let int_stream = stream::iter(1..=3); + /// + /// let int_stream = int_stream.timeout(Duration::from_secs(1)); + /// tokio::pin!(int_stream); + /// + /// // When no items time out, we get the 3 elements in succession: + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If the second item times out, we get an error and continue polling the stream: + /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert!(int_stream.try_next().await.is_err()); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If we want to stop consuming the source stream the first time an + /// // element times out, we can use the `take_while` operator: + /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// let mut int_stream = int_stream.take_while(Result::is_ok); + /// + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn timeout(self, duration: Duration) -> Timeout + where + Self: Sized, + { + Timeout::new(self, duration) + } + + /// Slows down a stream by enforcing a delay between items. + /// + /// # Example + /// + /// Create a throttled stream. + /// ```rust,no_run + /// use std::time::Duration; + /// use tokio_stream::StreamExt; + /// + /// # async fn dox() { + /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2)); + /// tokio::pin!(item_stream); + /// + /// loop { + /// // The string will be produced at most every 2 seconds + /// println!("{:?}", item_stream.next().await); + /// } + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn throttle(self, duration: Duration) -> Throttle + where + Self: Sized, + { + throttle(duration, self) + } +} + +impl StreamExt for St where St: Stream {} + +/// Merge the size hints from two streams. +fn merge_size_hints( + (left_low, left_high): (usize, Option), + (right_low, right_hign): (usize, Option), +) -> (usize, Option) { + let low = left_low.saturating_add(right_low); + let high = match (left_high, right_hign) { + (Some(h1), Some(h2)) => h1.checked_add(h2), + _ => None, + }; + (low, high) +} diff --git a/tokio-stream/src/all.rs b/tokio-stream/src/stream_ext/all.rs similarity index 100% rename from tokio-stream/src/all.rs rename to tokio-stream/src/stream_ext/all.rs diff --git a/tokio-stream/src/any.rs b/tokio-stream/src/stream_ext/any.rs similarity index 100% rename from tokio-stream/src/any.rs rename to tokio-stream/src/stream_ext/any.rs diff --git a/tokio-stream/src/chain.rs b/tokio-stream/src/stream_ext/chain.rs similarity index 95% rename from tokio-stream/src/chain.rs rename to tokio-stream/src/stream_ext/chain.rs index cfdef83d73c..bd64f33ce4e 100644 --- a/tokio-stream/src/chain.rs +++ b/tokio-stream/src/stream_ext/chain.rs @@ -1,4 +1,5 @@ -use crate::{Fuse, Stream}; +use crate::stream_ext::Fuse; +use crate::Stream; use core::pin::Pin; use core::task::{Context, Poll}; diff --git a/tokio-stream/src/collect.rs b/tokio-stream/src/stream_ext/collect.rs similarity index 100% rename from tokio-stream/src/collect.rs rename to tokio-stream/src/stream_ext/collect.rs diff --git a/tokio-stream/src/filter.rs b/tokio-stream/src/stream_ext/filter.rs similarity index 100% rename from tokio-stream/src/filter.rs rename to tokio-stream/src/stream_ext/filter.rs diff --git a/tokio-stream/src/filter_map.rs b/tokio-stream/src/stream_ext/filter_map.rs similarity index 100% rename from tokio-stream/src/filter_map.rs rename to tokio-stream/src/stream_ext/filter_map.rs diff --git a/tokio-stream/src/fold.rs b/tokio-stream/src/stream_ext/fold.rs similarity index 100% rename from tokio-stream/src/fold.rs rename to tokio-stream/src/stream_ext/fold.rs diff --git a/tokio-stream/src/fuse.rs b/tokio-stream/src/stream_ext/fuse.rs similarity index 100% rename from tokio-stream/src/fuse.rs rename to tokio-stream/src/stream_ext/fuse.rs diff --git a/tokio-stream/src/map.rs b/tokio-stream/src/stream_ext/map.rs similarity index 100% rename from tokio-stream/src/map.rs rename to tokio-stream/src/stream_ext/map.rs diff --git a/tokio-stream/src/merge.rs b/tokio-stream/src/stream_ext/merge.rs similarity index 97% rename from tokio-stream/src/merge.rs rename to tokio-stream/src/stream_ext/merge.rs index ea0ace0e21e..9d5123c85a3 100644 --- a/tokio-stream/src/merge.rs +++ b/tokio-stream/src/stream_ext/merge.rs @@ -1,4 +1,5 @@ -use crate::{Fuse, Stream}; +use crate::stream_ext::Fuse; +use crate::Stream; use core::pin::Pin; use core::task::{Context, Poll}; diff --git a/tokio-stream/src/next.rs b/tokio-stream/src/stream_ext/next.rs similarity index 100% rename from tokio-stream/src/next.rs rename to tokio-stream/src/stream_ext/next.rs diff --git a/tokio-stream/src/skip.rs b/tokio-stream/src/stream_ext/skip.rs similarity index 100% rename from tokio-stream/src/skip.rs rename to tokio-stream/src/stream_ext/skip.rs diff --git a/tokio-stream/src/skip_while.rs b/tokio-stream/src/stream_ext/skip_while.rs similarity index 100% rename from tokio-stream/src/skip_while.rs rename to tokio-stream/src/stream_ext/skip_while.rs diff --git a/tokio-stream/src/take.rs b/tokio-stream/src/stream_ext/take.rs similarity index 100% rename from tokio-stream/src/take.rs rename to tokio-stream/src/stream_ext/take.rs diff --git a/tokio-stream/src/take_while.rs b/tokio-stream/src/stream_ext/take_while.rs similarity index 100% rename from tokio-stream/src/take_while.rs rename to tokio-stream/src/stream_ext/take_while.rs diff --git a/tokio-stream/src/throttle.rs b/tokio-stream/src/stream_ext/throttle.rs similarity index 100% rename from tokio-stream/src/throttle.rs rename to tokio-stream/src/stream_ext/throttle.rs diff --git a/tokio-stream/src/timeout.rs b/tokio-stream/src/stream_ext/timeout.rs similarity index 97% rename from tokio-stream/src/timeout.rs rename to tokio-stream/src/stream_ext/timeout.rs index 9ebbaa2332c..de17dc0091f 100644 --- a/tokio-stream/src/timeout.rs +++ b/tokio-stream/src/stream_ext/timeout.rs @@ -1,4 +1,5 @@ -use crate::{Fuse, Stream}; +use crate::stream_ext::Fuse; +use crate::Stream; use tokio::time::{Instant, Sleep}; use core::future::Future; diff --git a/tokio-stream/src/try_next.rs b/tokio-stream/src/stream_ext/try_next.rs similarity index 95% rename from tokio-stream/src/try_next.rs rename to tokio-stream/src/stream_ext/try_next.rs index e91e8a429b5..af27d87d8e9 100644 --- a/tokio-stream/src/try_next.rs +++ b/tokio-stream/src/stream_ext/try_next.rs @@ -1,4 +1,5 @@ -use crate::{Next, Stream}; +use crate::stream_ext::Next; +use crate::Stream; use core::future::Future; use core::marker::PhantomPinned; From df2d4910b13875410d98fb19d7049560c6781361 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 25 Dec 2020 17:12:10 +0100 Subject: [PATCH 2/9] stream: add wrappers for Tokio types --- tokio-stream/Cargo.toml | 5 +- tokio-stream/src/lib.rs | 2 + tokio-stream/src/macros.rs | 30 +++++++++++ tokio-stream/src/wrappers.rs | 33 ++++++++++++ tokio-stream/src/wrappers/interval.rs | 38 +++++++++++++ tokio-stream/src/wrappers/lines.rs | 42 +++++++++++++++ tokio-stream/src/wrappers/mpsc_bounded.rs | 59 +++++++++++++++++++++ tokio-stream/src/wrappers/mpsc_unbounded.rs | 53 ++++++++++++++++++ tokio-stream/src/wrappers/read_dir.rs | 35 ++++++++++++ tokio-stream/src/wrappers/split.rs | 42 +++++++++++++++ tokio-stream/src/wrappers/tcp_listener.rs | 42 +++++++++++++++ tokio-stream/src/wrappers/unix_listener.rs | 42 +++++++++++++++ 12 files changed, 422 insertions(+), 1 deletion(-) create mode 100644 tokio-stream/src/wrappers.rs create mode 100644 tokio-stream/src/wrappers/interval.rs create mode 100644 tokio-stream/src/wrappers/lines.rs create mode 100644 tokio-stream/src/wrappers/mpsc_bounded.rs create mode 100644 tokio-stream/src/wrappers/mpsc_unbounded.rs create mode 100644 tokio-stream/src/wrappers/read_dir.rs create mode 100644 tokio-stream/src/wrappers/split.rs create mode 100644 tokio-stream/src/wrappers/tcp_listener.rs create mode 100644 tokio-stream/src/wrappers/unix_listener.rs diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index 6d28588233e..658d612ef90 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -22,6 +22,9 @@ categories = ["asynchronous"] [features] default = ["time"] time = ["tokio/time"] +net = ["tokio/net"] +io-util = ["tokio/io-util"] +fs = ["tokio/fs"] [dependencies] futures-core = { version = "0.3.0" } @@ -34,4 +37,4 @@ tokio = { version = "1.0", path = "../tokio", features = ["full"] } tokio-test = { path = "../tokio-test" } futures = { version = "0.3", default-features = false } -proptest = "0.10.0" \ No newline at end of file +proptest = "0.10.0" diff --git a/tokio-stream/src/lib.rs b/tokio-stream/src/lib.rs index 3c6d84ee469..a6d6f22ead5 100644 --- a/tokio-stream/src/lib.rs +++ b/tokio-stream/src/lib.rs @@ -81,6 +81,8 @@ #[macro_use] mod macros; +pub mod wrappers; + mod stream_ext; pub use stream_ext::{collect::FromStream, StreamExt}; diff --git a/tokio-stream/src/macros.rs b/tokio-stream/src/macros.rs index 0d493a85119..39ad86cc5d8 100644 --- a/tokio-stream/src/macros.rs +++ b/tokio-stream/src/macros.rs @@ -1,3 +1,33 @@ +macro_rules! cfg_fs { + ($($item:item)*) => { + $( + #[cfg(feature = "fs")] + #[cfg_attr(docsrs, doc(cfg(feature = "fs")))] + $item + )* + } +} + +macro_rules! cfg_io_util { + ($($item:item)*) => { + $( + #[cfg(feature = "io-util")] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + $item + )* + } +} + +macro_rules! cfg_net { + ($($item:item)*) => { + $( + #[cfg(feature = "net")] + #[cfg_attr(docsrs, doc(cfg(feature = "net")))] + $item + )* + } +} + macro_rules! cfg_time { ($($item:item)*) => { $( diff --git a/tokio-stream/src/wrappers.rs b/tokio-stream/src/wrappers.rs new file mode 100644 index 00000000000..41fbb127ac5 --- /dev/null +++ b/tokio-stream/src/wrappers.rs @@ -0,0 +1,33 @@ +//! Wrappers for Tokio types that implement `Stream`. + +mod mpsc_bounded; +pub use mpsc_bounded::ReceiverStream; + +mod mpsc_unbounded; +pub use mpsc_unbounded::UnboundedReceiverStream; + +cfg_time! { + mod interval; + pub use interval::IntervalStream; +} + +cfg_net! { + mod tcp_listener; + pub use tcp_listener::TcpListenerStream; + + mod unix_listener; + pub use unix_listener::UnixListenerStream; +} + +cfg_io_util! { + mod split; + pub use split::SplitStream; + + mod lines; + pub use lines::LinesStream; +} + +cfg_fs! { + mod read_dir; + pub use read_dir::ReadDirStream; +} diff --git a/tokio-stream/src/wrappers/interval.rs b/tokio-stream/src/wrappers/interval.rs new file mode 100644 index 00000000000..38cdbed5d79 --- /dev/null +++ b/tokio-stream/src/wrappers/interval.rs @@ -0,0 +1,38 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::time::{Instant, Interval}; + +/// A wrapper around [`Interval`] that implements [`Stream`]. +/// +/// [`Interval`]: struct@tokio::time::Interval +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "time")))] +pub struct IntervalStream { + inner: Interval, +} + +impl IntervalStream { + /// Create a new `IntervalStream`. + pub fn new(interval: Interval) -> Self { + Self { inner: interval } + } + + /// Get back the inner `Interval`. + pub fn into_inner(self) -> Interval { + self.inner + } +} + +impl Stream for IntervalStream { + type Item = Instant; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_tick(cx).map(|inner| Some(inner)) + } + + fn size_hint(&self) -> (usize, Option) { + (std::usize::MAX, None) + } +} diff --git a/tokio-stream/src/wrappers/lines.rs b/tokio-stream/src/wrappers/lines.rs new file mode 100644 index 00000000000..e4df130c6f3 --- /dev/null +++ b/tokio-stream/src/wrappers/lines.rs @@ -0,0 +1,42 @@ +use crate::Stream; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncBufRead, Lines}; + +pin_project! { + /// A wrapper around [`tokio::io::Lines`] that implements [`Stream`]. + /// + /// [`tokio::io::Lines`]: struct@tokio::io::Lines + /// [`Stream`]: trait@crate::Stream + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + pub struct LinesStream { + #[pin] + inner: Lines, + } +} + +impl LinesStream { + /// Create a new `LinesStream`. + pub fn new(lines: Lines) -> Self { + Self { inner: lines } + } + + /// Get back the inner `Lines`. + pub fn into_inner(self) -> Lines { + self.inner + } +} + +impl Stream for LinesStream { + type Item = io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .inner + .poll_next_line(cx) + .map(Result::transpose) + } +} diff --git a/tokio-stream/src/wrappers/mpsc_bounded.rs b/tokio-stream/src/wrappers/mpsc_bounded.rs new file mode 100644 index 00000000000..abb62650b53 --- /dev/null +++ b/tokio-stream/src/wrappers/mpsc_bounded.rs @@ -0,0 +1,59 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::sync::mpsc::Receiver; + +/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +pub struct ReceiverStream { + inner: Receiver, +} + +impl ReceiverStream { + /// Create a new `ReceiverStream`. + pub fn new(recv: Receiver) -> Self { + Self { inner: recv } + } + + /// Get back the inner `Receiver`. + pub fn into_inner(self) -> Receiver { + self.inner + } + + /// Closes the receiving half of a channel without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. Any + /// outstanding [`Permit`] values will still be able to send messages. + /// + /// To guarantee no messages are dropped, after calling `close()`, you must + /// receive all items from the stream until `None` is returned. + /// + /// [`Permit`]: struct@tokio::sync::mpsc::Permit + pub fn close(&mut self) { + self.inner.close() + } + + /// Access the inner `Receiver` mutably. This method can be used to e.g. + /// call the async or blocking version of receive instead of using the + /// `Stream` impl. + pub fn as_mut(&mut self) -> &mut Receiver { + &mut self.inner + } + + /// Access the inner `Receiver` immutably. + pub fn as_ref(&self) -> &Receiver { + &self.inner + } +} + +impl Stream for ReceiverStream { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_recv(cx) + } +} diff --git a/tokio-stream/src/wrappers/mpsc_unbounded.rs b/tokio-stream/src/wrappers/mpsc_unbounded.rs new file mode 100644 index 00000000000..1f9e5f5d960 --- /dev/null +++ b/tokio-stream/src/wrappers/mpsc_unbounded.rs @@ -0,0 +1,53 @@ +use crate::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::sync::mpsc::UnboundedReceiver; + +/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +pub struct UnboundedReceiverStream { + inner: UnboundedReceiver, +} + +impl UnboundedReceiverStream { + /// Create a new `UnboundedReceiverStream`. + pub fn new(recv: UnboundedReceiver) -> Self { + Self { inner: recv } + } + + /// Get back the inner `UnboundedReceiver`. + pub fn into_inner(self) -> UnboundedReceiver { + self.inner + } + + /// Closes the receiving half of a channel without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.inner.close() + } + + /// Access the inner `UnboundedReceiver` mutably. This method can be used to + /// e.g. call the async or blocking version of receive instead of using the + /// `Stream` impl. + pub fn as_mut(&mut self) -> &mut UnboundedReceiver { + &mut self.inner + } + + /// Access the inner `UnboundedReceiver` immutably. + pub fn as_ref(&self) -> &UnboundedReceiver { + &self.inner + } +} + +impl Stream for UnboundedReceiverStream { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_recv(cx) + } +} diff --git a/tokio-stream/src/wrappers/read_dir.rs b/tokio-stream/src/wrappers/read_dir.rs new file mode 100644 index 00000000000..a1fc7a1e9f6 --- /dev/null +++ b/tokio-stream/src/wrappers/read_dir.rs @@ -0,0 +1,35 @@ +use crate::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::fs::{DirEntry, ReadDir}; + +/// A wrapper around [`tokio::fs::ReadDir`] that implements [`Stream`]. +/// +/// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "fs")))] +pub struct ReadDirStream { + inner: ReadDir, +} + +impl ReadDirStream { + /// Create a new `ReadDirStream`. + pub fn new(read_dir: ReadDir) -> Self { + Self { inner: read_dir } + } + + /// Get back the inner `ReadDir`. + pub fn into_inner(self) -> ReadDir { + self.inner + } +} + +impl Stream for ReadDirStream { + type Item = io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_next_entry(cx).map(Result::transpose) + } +} diff --git a/tokio-stream/src/wrappers/split.rs b/tokio-stream/src/wrappers/split.rs new file mode 100644 index 00000000000..d556bbaf11a --- /dev/null +++ b/tokio-stream/src/wrappers/split.rs @@ -0,0 +1,42 @@ +use crate::Stream; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncBufRead, Split}; + +pin_project! { + /// A wrapper around [`tokio::io::Split`] that implements [`Stream`]. + /// + /// [`tokio::io::Split`]: struct@tokio::io::Split + /// [`Stream`]: trait@crate::Stream + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + pub struct SplitStream { + #[pin] + inner: Split, + } +} + +impl SplitStream { + /// Create a new `SplitStream`. + pub fn new(split: Split) -> Self { + Self { inner: split } + } + + /// Get back the inner `Split`. + pub fn into_inner(self) -> Split { + self.inner + } +} + +impl Stream for SplitStream { + type Item = io::Result>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .inner + .poll_next_segment(cx) + .map(Result::transpose) + } +} diff --git a/tokio-stream/src/wrappers/tcp_listener.rs b/tokio-stream/src/wrappers/tcp_listener.rs new file mode 100644 index 00000000000..0f8fa9387cc --- /dev/null +++ b/tokio-stream/src/wrappers/tcp_listener.rs @@ -0,0 +1,42 @@ +use crate::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::net::{TcpListener, TcpStream}; + +/// A wrapper around [`TcpListener`] that implements [`Stream`]. +/// +/// [`TcpListener`]: struct@tokio::net::TcpListener +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "net")))] +pub struct TcpListenerStream { + inner: TcpListener, +} + +impl TcpListenerStream { + /// Create a new `TcpListenerStream`. + pub fn new(listener: TcpListener) -> Self { + Self { inner: listener } + } + + /// Get back the inner `TcpListener`. + pub fn into_inner(self) -> TcpListener { + self.inner + } +} + +impl Stream for TcpListenerStream { + type Item = io::Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + match self.inner.poll_accept(cx) { + Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/tokio-stream/src/wrappers/unix_listener.rs b/tokio-stream/src/wrappers/unix_listener.rs new file mode 100644 index 00000000000..d40d5b2f8b8 --- /dev/null +++ b/tokio-stream/src/wrappers/unix_listener.rs @@ -0,0 +1,42 @@ +use crate::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::net::{UnixListener, UnixStream}; + +/// A wrapper around [`UnixListener`] that implements [`Stream`]. +/// +/// [`UnixListener`]: struct@tokio::net::UnixListener +/// [`Stream`]: trait@crate::Stream +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "net")))] +pub struct UnixListenerStream { + inner: UnixListener, +} + +impl UnixListenerStream { + /// Create a new `UnixListenerStream`. + pub fn new(listener: UnixListener) -> Self { + Self { inner: listener } + } + + /// Get back the inner `UnixListener`. + pub fn into_inner(self) -> UnixListener { + self.inner + } +} + +impl Stream for UnixListenerStream { + type Item = io::Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + match self.inner.poll_accept(cx) { + Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + } + } +} From df3a3ec86ce2c065ecbd91813096d5b3163a362a Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 25 Dec 2020 17:35:19 +0100 Subject: [PATCH 3/9] tokio: add links to wrapper types --- tokio/src/fs/read_dir.rs | 13 ++++++++----- tokio/src/io/util/lines.rs | 10 +++++++++- tokio/src/io/util/split.rs | 6 +++++- tokio/src/net/tcp/listener.rs | 4 ++++ tokio/src/net/unix/listener.rs | 4 ++++ tokio/src/sync/mpsc/bounded.rs | 4 ++++ tokio/src/sync/mpsc/unbounded.rs | 4 ++++ tokio/src/time/interval.rs | 11 ++++++++++- 8 files changed, 48 insertions(+), 8 deletions(-) diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs index 7b21c9ccec0..41aef0c37bb 100644 --- a/tokio/src/fs/read_dir.rs +++ b/tokio/src/fs/read_dir.rs @@ -20,12 +20,15 @@ pub async fn read_dir(path: impl AsRef) -> io::Result { Ok(ReadDir(State::Idle(Some(std)))) } -/// Stream of the entries in a directory. +/// Read the the entries in a directory. /// -/// This stream is returned from the [`read_dir`] function of this module and -/// will yield instances of [`DirEntry`]. Through a [`DirEntry`] -/// information like the entry's path and possibly other metadata can be -/// learned. +/// This struct is returned from the [`read_dir`] function of this module and +/// will yield instances of [`DirEntry`]. Through a [`DirEntry`] information +/// like the entry's path and possibly other metadata can be learned. +/// +/// A `ReadDir` can be turned into a `Stream` with [`ReadDirStream`]. +/// +/// [`ReadDirStream`]: ../../tokio_stream/wrappers/struct.ReadDirStream.html /// /// # Errors /// diff --git a/tokio/src/io/util/lines.rs b/tokio/src/io/util/lines.rs index 25df78e99f1..558acc0779c 100644 --- a/tokio/src/io/util/lines.rs +++ b/tokio/src/io/util/lines.rs @@ -8,7 +8,15 @@ use std::pin::Pin; use std::task::{Context, Poll}; pin_project! { - /// Stream for the [`lines`](crate::io::AsyncBufReadExt::lines) method. + /// Read lines from an [`AsyncBufRead`]. + /// + /// A `Lines` can be turned into a `Stream` with [`LinesStream`]. + /// + /// This type is usually created using the [`lines`] method. + /// + /// [`AsyncBufRead`]: crate::io::AsyncBufRead + /// [`LinesStream`]: ../../tokio_stream/wrappers/struct.LinesStream.html + /// [`lines`]: crate::io::AsyncBufReadExt::lines #[derive(Debug)] #[must_use = "streams do nothing unless polled"] #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] diff --git a/tokio/src/io/util/split.rs b/tokio/src/io/util/split.rs index eb828659e08..3c4d90fc4f8 100644 --- a/tokio/src/io/util/split.rs +++ b/tokio/src/io/util/split.rs @@ -8,7 +8,11 @@ use std::pin::Pin; use std::task::{Context, Poll}; pin_project! { - /// Stream for the [`split`](crate::io::AsyncBufReadExt::split) method. + /// Splitter for the [`split`](crate::io::AsyncBufReadExt::split) method. + /// + /// A `Split` can be turned into a `Stream` with [`SplitStream`]. + /// + /// [`SplitStream`]: ../../tokio_stream/wrappers/struct.SplitStream.html #[derive(Debug)] #[must_use = "streams do nothing unless polled"] #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index a2a8637ecff..4543b806a49 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -14,6 +14,10 @@ cfg_net! { /// You can accept a new connection by using the [`accept`](`TcpListener::accept`) /// method. /// + /// A `TcpListener` can be turned into a `Stream` with [`TcpListenerStream`]. + /// + /// [`TcpListenerStream`]: ../../tokio_stream/wrappers/struct.TcpListenerStream.html + /// /// # Errors /// /// Note that accepting a connection can lead to various errors and not all diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 9ed4ce175b0..51ffe5ccf20 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -14,6 +14,10 @@ cfg_net_unix! { /// /// You can accept a new connection by using the [`accept`](`UnixListener::accept`) method. /// + /// A `UnixListener` can be turned into a `Stream` with [`UnixListenerStream`]. + /// + /// [`UnixListenerStream`]: ../../tokio_stream/wrappers/struct.UnixListenerStream.html + /// /// # Errors /// /// Note that accepting a connection can lead to various errors and not all diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 2dae7e26fe4..12b910f7602 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -33,6 +33,10 @@ pub struct Permit<'a, T> { /// Receive values from the associated `Sender`. /// /// Instances are created by the [`channel`](channel) function. +/// +/// This receiver can be turned into a `Stream` using [`ReceiverStream`]. +/// +/// [`ReceiverStream`]: ../../../tokio_stream/wrappers/struct.ReceiverStream.html pub struct Receiver { /// The channel receiver chan: chan::Rx, diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 38953b8f978..688d5052f8b 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -33,6 +33,10 @@ impl fmt::Debug for UnboundedSender { /// /// Instances are created by the /// [`unbounded_channel`](unbounded_channel) function. +/// +/// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`]. +/// +/// [`UnboundedReceiverStream`]: ../../../tokio_stream/wrappers/struct.UnboundedReceiverStream.html pub struct UnboundedReceiver { /// The channel receiver chan: chan::Rx, diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index be93ba12c69..874f437fab5 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -106,7 +106,16 @@ pub fn interval_at(start: Instant, period: Duration) -> Interval { } } -/// Stream returned by [`interval`](interval) and [`interval_at`](interval_at). +/// Interval returned by [`interval`](interval) and [`interval_at`](interval_at). +/// +/// This type allows you to wait on a sequence of instants with a certain +/// duration between each instant. Unlike calling [`sleep`](crate::time::sleep) +/// in a loop, this lets you count the time spent between the calls to `sleep` +/// as well. +/// +/// An `Interval` can be turned into a `Stream` with [`IntervalStream`]. +/// +/// [`IntervalStream`]: ../../tokio_stream/wrappers/struct.IntervalStream.html #[derive(Debug)] pub struct Interval { /// Future that completes the next time the `Interval` yields a value. From 8904c54b7a8e46c529a0985a15928ddcda5a03ca Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 25 Dec 2020 17:38:27 +0100 Subject: [PATCH 4/9] clippy --- tokio-stream/src/wrappers/interval.rs | 2 +- tokio-stream/src/wrappers/mpsc_bounded.rs | 25 +++++++++++---------- tokio-stream/src/wrappers/mpsc_unbounded.rs | 24 ++++++++++---------- 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/tokio-stream/src/wrappers/interval.rs b/tokio-stream/src/wrappers/interval.rs index 38cdbed5d79..cab09a91bfc 100644 --- a/tokio-stream/src/wrappers/interval.rs +++ b/tokio-stream/src/wrappers/interval.rs @@ -29,7 +29,7 @@ impl Stream for IntervalStream { type Item = Instant; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_tick(cx).map(|inner| Some(inner)) + self.inner.poll_tick(cx).map(Some) } fn size_hint(&self) -> (usize, Option) { diff --git a/tokio-stream/src/wrappers/mpsc_bounded.rs b/tokio-stream/src/wrappers/mpsc_bounded.rs index abb62650b53..3f5426cad1d 100644 --- a/tokio-stream/src/wrappers/mpsc_bounded.rs +++ b/tokio-stream/src/wrappers/mpsc_bounded.rs @@ -1,6 +1,7 @@ use crate::Stream; use std::pin::Pin; use std::task::{Context, Poll}; +use std::convert::{AsRef, AsMut}; use tokio::sync::mpsc::Receiver; /// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. @@ -36,18 +37,6 @@ impl ReceiverStream { pub fn close(&mut self) { self.inner.close() } - - /// Access the inner `Receiver` mutably. This method can be used to e.g. - /// call the async or blocking version of receive instead of using the - /// `Stream` impl. - pub fn as_mut(&mut self) -> &mut Receiver { - &mut self.inner - } - - /// Access the inner `Receiver` immutably. - pub fn as_ref(&self) -> &Receiver { - &self.inner - } } impl Stream for ReceiverStream { @@ -57,3 +46,15 @@ impl Stream for ReceiverStream { self.inner.poll_recv(cx) } } + +impl AsRef> for ReceiverStream { + fn as_ref(&self) -> &Receiver { + &self.inner + } +} + +impl AsMut> for ReceiverStream { + fn as_mut(&mut self) -> &mut Receiver { + &mut self.inner + } +} diff --git a/tokio-stream/src/wrappers/mpsc_unbounded.rs b/tokio-stream/src/wrappers/mpsc_unbounded.rs index 1f9e5f5d960..bc5f40cdc9f 100644 --- a/tokio-stream/src/wrappers/mpsc_unbounded.rs +++ b/tokio-stream/src/wrappers/mpsc_unbounded.rs @@ -30,18 +30,6 @@ impl UnboundedReceiverStream { pub fn close(&mut self) { self.inner.close() } - - /// Access the inner `UnboundedReceiver` mutably. This method can be used to - /// e.g. call the async or blocking version of receive instead of using the - /// `Stream` impl. - pub fn as_mut(&mut self) -> &mut UnboundedReceiver { - &mut self.inner - } - - /// Access the inner `UnboundedReceiver` immutably. - pub fn as_ref(&self) -> &UnboundedReceiver { - &self.inner - } } impl Stream for UnboundedReceiverStream { @@ -51,3 +39,15 @@ impl Stream for UnboundedReceiverStream { self.inner.poll_recv(cx) } } + +impl AsRef> for UnboundedReceiverStream { + fn as_ref(&self) -> &UnboundedReceiver { + &self.inner + } +} + +impl AsMut> for UnboundedReceiverStream { + fn as_mut(&mut self) -> &mut UnboundedReceiver { + &mut self.inner + } +} From 68ad8e34ee3edd4ebc74a99adb4ccc68b83476a9 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 25 Dec 2020 17:56:49 +0100 Subject: [PATCH 5/9] Add more AsRef/AsMut impls --- tokio-stream/src/wrappers/interval.rs | 13 +++++++++++++ tokio-stream/src/wrappers/lines.rs | 13 +++++++++++++ tokio-stream/src/wrappers/mpsc_bounded.rs | 2 +- tokio-stream/src/wrappers/mpsc_unbounded.rs | 1 + tokio-stream/src/wrappers/read_dir.rs | 13 +++++++++++++ tokio-stream/src/wrappers/split.rs | 13 +++++++++++++ tokio-stream/src/wrappers/tcp_listener.rs | 13 +++++++++++++ tokio-stream/src/wrappers/unix_listener.rs | 13 +++++++++++++ 8 files changed, 80 insertions(+), 1 deletion(-) diff --git a/tokio-stream/src/wrappers/interval.rs b/tokio-stream/src/wrappers/interval.rs index cab09a91bfc..7e516a23d9c 100644 --- a/tokio-stream/src/wrappers/interval.rs +++ b/tokio-stream/src/wrappers/interval.rs @@ -1,4 +1,5 @@ use crate::Stream; +use std::convert::{AsMut, AsRef}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::time::{Instant, Interval}; @@ -36,3 +37,15 @@ impl Stream for IntervalStream { (std::usize::MAX, None) } } + +impl AsRef for IntervalStream { + fn as_ref(&self) -> &Interval { + &self.inner + } +} + +impl AsMut for IntervalStream { + fn as_mut(&mut self) -> &mut Interval { + &mut self.inner + } +} diff --git a/tokio-stream/src/wrappers/lines.rs b/tokio-stream/src/wrappers/lines.rs index e4df130c6f3..3562db6e51e 100644 --- a/tokio-stream/src/wrappers/lines.rs +++ b/tokio-stream/src/wrappers/lines.rs @@ -1,5 +1,6 @@ use crate::Stream; use pin_project_lite::pin_project; +use std::convert::{AsMut, AsRef}; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -40,3 +41,15 @@ impl Stream for LinesStream { .map(Result::transpose) } } + +impl AsRef> for LinesStream { + fn as_ref(&self) -> &Lines { + &self.inner + } +} + +impl AsMut> for LinesStream { + fn as_mut(&mut self) -> &mut Lines { + &mut self.inner + } +} diff --git a/tokio-stream/src/wrappers/mpsc_bounded.rs b/tokio-stream/src/wrappers/mpsc_bounded.rs index 3f5426cad1d..f843c4a2db9 100644 --- a/tokio-stream/src/wrappers/mpsc_bounded.rs +++ b/tokio-stream/src/wrappers/mpsc_bounded.rs @@ -1,7 +1,7 @@ use crate::Stream; +use std::convert::{AsMut, AsRef}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::convert::{AsRef, AsMut}; use tokio::sync::mpsc::Receiver; /// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. diff --git a/tokio-stream/src/wrappers/mpsc_unbounded.rs b/tokio-stream/src/wrappers/mpsc_unbounded.rs index bc5f40cdc9f..0cea4d31869 100644 --- a/tokio-stream/src/wrappers/mpsc_unbounded.rs +++ b/tokio-stream/src/wrappers/mpsc_unbounded.rs @@ -1,4 +1,5 @@ use crate::Stream; +use std::convert::{AsMut, AsRef}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::sync::mpsc::UnboundedReceiver; diff --git a/tokio-stream/src/wrappers/read_dir.rs b/tokio-stream/src/wrappers/read_dir.rs index a1fc7a1e9f6..014e4a956bd 100644 --- a/tokio-stream/src/wrappers/read_dir.rs +++ b/tokio-stream/src/wrappers/read_dir.rs @@ -1,4 +1,5 @@ use crate::Stream; +use std::convert::{AsMut, AsRef}; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -33,3 +34,15 @@ impl Stream for ReadDirStream { self.inner.poll_next_entry(cx).map(Result::transpose) } } + +impl AsRef for ReadDirStream { + fn as_ref(&self) -> &ReadDir { + &self.inner + } +} + +impl AsMut for ReadDirStream { + fn as_mut(&mut self) -> &mut ReadDir { + &mut self.inner + } +} diff --git a/tokio-stream/src/wrappers/split.rs b/tokio-stream/src/wrappers/split.rs index d556bbaf11a..5a6801f381c 100644 --- a/tokio-stream/src/wrappers/split.rs +++ b/tokio-stream/src/wrappers/split.rs @@ -1,5 +1,6 @@ use crate::Stream; use pin_project_lite::pin_project; +use std::convert::{AsMut, AsRef}; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -40,3 +41,15 @@ impl Stream for SplitStream { .map(Result::transpose) } } + +impl AsRef> for SplitStream { + fn as_ref(&self) -> &Split { + &self.inner + } +} + +impl AsMut> for SplitStream { + fn as_mut(&mut self) -> &mut Split { + &mut self.inner + } +} diff --git a/tokio-stream/src/wrappers/tcp_listener.rs b/tokio-stream/src/wrappers/tcp_listener.rs index 0f8fa9387cc..71ec64f8ec2 100644 --- a/tokio-stream/src/wrappers/tcp_listener.rs +++ b/tokio-stream/src/wrappers/tcp_listener.rs @@ -1,4 +1,5 @@ use crate::Stream; +use std::convert::{AsMut, AsRef}; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -40,3 +41,15 @@ impl Stream for TcpListenerStream { } } } + +impl AsRef for TcpListenerStream { + fn as_ref(&self) -> &TcpListener { + &self.inner + } +} + +impl AsMut for TcpListenerStream { + fn as_mut(&mut self) -> &mut TcpListener { + &mut self.inner + } +} diff --git a/tokio-stream/src/wrappers/unix_listener.rs b/tokio-stream/src/wrappers/unix_listener.rs index d40d5b2f8b8..c331204f471 100644 --- a/tokio-stream/src/wrappers/unix_listener.rs +++ b/tokio-stream/src/wrappers/unix_listener.rs @@ -1,4 +1,5 @@ use crate::Stream; +use std::convert::{AsMut, AsRef}; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -40,3 +41,15 @@ impl Stream for UnixListenerStream { } } } + +impl AsRef for UnixListenerStream { + fn as_ref(&self) -> &UnixListener { + &self.inner + } +} + +impl AsMut for UnixListenerStream { + fn as_mut(&mut self) -> &mut UnixListener { + &mut self.inner + } +} From 5a2f71bc90bd536b11fc92d90d22867bb5e5517f Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 25 Dec 2020 18:13:44 +0100 Subject: [PATCH 6/9] UnixListener not available on windows --- tokio-stream/src/wrappers.rs | 2 ++ tokio-stream/src/wrappers/unix_listener.rs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/tokio-stream/src/wrappers.rs b/tokio-stream/src/wrappers.rs index 41fbb127ac5..c0ffb234a09 100644 --- a/tokio-stream/src/wrappers.rs +++ b/tokio-stream/src/wrappers.rs @@ -15,7 +15,9 @@ cfg_net! { mod tcp_listener; pub use tcp_listener::TcpListenerStream; + #[cfg(unix)] mod unix_listener; + #[cfg(unix)] pub use unix_listener::UnixListenerStream; } diff --git a/tokio-stream/src/wrappers/unix_listener.rs b/tokio-stream/src/wrappers/unix_listener.rs index c331204f471..11eb7aa2e48 100644 --- a/tokio-stream/src/wrappers/unix_listener.rs +++ b/tokio-stream/src/wrappers/unix_listener.rs @@ -10,7 +10,7 @@ use tokio::net::{UnixListener, UnixStream}; /// [`UnixListener`]: struct@tokio::net::UnixListener /// [`Stream`]: trait@crate::Stream #[derive(Debug)] -#[cfg_attr(docsrs, doc(cfg(feature = "net")))] +#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "net"))))] pub struct UnixListenerStream { inner: UnixListener, } From bad0e55af71ed13b2d6cdedb280d1df4acb9adcb Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sun, 27 Dec 2020 16:19:17 +0100 Subject: [PATCH 7/9] Remove Asmut AsRef imports --- tokio-stream/src/wrappers/interval.rs | 1 - tokio-stream/src/wrappers/lines.rs | 1 - tokio-stream/src/wrappers/mpsc_bounded.rs | 1 - tokio-stream/src/wrappers/mpsc_unbounded.rs | 1 - tokio-stream/src/wrappers/read_dir.rs | 1 - tokio-stream/src/wrappers/split.rs | 1 - tokio-stream/src/wrappers/tcp_listener.rs | 1 - tokio-stream/src/wrappers/unix_listener.rs | 1 - 8 files changed, 8 deletions(-) diff --git a/tokio-stream/src/wrappers/interval.rs b/tokio-stream/src/wrappers/interval.rs index 7e516a23d9c..2bf0194bd0f 100644 --- a/tokio-stream/src/wrappers/interval.rs +++ b/tokio-stream/src/wrappers/interval.rs @@ -1,5 +1,4 @@ use crate::Stream; -use std::convert::{AsMut, AsRef}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::time::{Instant, Interval}; diff --git a/tokio-stream/src/wrappers/lines.rs b/tokio-stream/src/wrappers/lines.rs index 3562db6e51e..abf91d62ea9 100644 --- a/tokio-stream/src/wrappers/lines.rs +++ b/tokio-stream/src/wrappers/lines.rs @@ -1,6 +1,5 @@ use crate::Stream; use pin_project_lite::pin_project; -use std::convert::{AsMut, AsRef}; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/tokio-stream/src/wrappers/mpsc_bounded.rs b/tokio-stream/src/wrappers/mpsc_bounded.rs index f843c4a2db9..e4f90000985 100644 --- a/tokio-stream/src/wrappers/mpsc_bounded.rs +++ b/tokio-stream/src/wrappers/mpsc_bounded.rs @@ -1,5 +1,4 @@ use crate::Stream; -use std::convert::{AsMut, AsRef}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::sync::mpsc::Receiver; diff --git a/tokio-stream/src/wrappers/mpsc_unbounded.rs b/tokio-stream/src/wrappers/mpsc_unbounded.rs index 0cea4d31869..bc5f40cdc9f 100644 --- a/tokio-stream/src/wrappers/mpsc_unbounded.rs +++ b/tokio-stream/src/wrappers/mpsc_unbounded.rs @@ -1,5 +1,4 @@ use crate::Stream; -use std::convert::{AsMut, AsRef}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::sync::mpsc::UnboundedReceiver; diff --git a/tokio-stream/src/wrappers/read_dir.rs b/tokio-stream/src/wrappers/read_dir.rs index 014e4a956bd..b5cf54f79e1 100644 --- a/tokio-stream/src/wrappers/read_dir.rs +++ b/tokio-stream/src/wrappers/read_dir.rs @@ -1,5 +1,4 @@ use crate::Stream; -use std::convert::{AsMut, AsRef}; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/tokio-stream/src/wrappers/split.rs b/tokio-stream/src/wrappers/split.rs index 5a6801f381c..57f84b2fd7c 100644 --- a/tokio-stream/src/wrappers/split.rs +++ b/tokio-stream/src/wrappers/split.rs @@ -1,6 +1,5 @@ use crate::Stream; use pin_project_lite::pin_project; -use std::convert::{AsMut, AsRef}; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/tokio-stream/src/wrappers/tcp_listener.rs b/tokio-stream/src/wrappers/tcp_listener.rs index 71ec64f8ec2..ce7cb163507 100644 --- a/tokio-stream/src/wrappers/tcp_listener.rs +++ b/tokio-stream/src/wrappers/tcp_listener.rs @@ -1,5 +1,4 @@ use crate::Stream; -use std::convert::{AsMut, AsRef}; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/tokio-stream/src/wrappers/unix_listener.rs b/tokio-stream/src/wrappers/unix_listener.rs index 11eb7aa2e48..0beba588c20 100644 --- a/tokio-stream/src/wrappers/unix_listener.rs +++ b/tokio-stream/src/wrappers/unix_listener.rs @@ -1,5 +1,4 @@ use crate::Stream; -use std::convert::{AsMut, AsRef}; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; From 8754aecd99a30de305e68a2171ec54c7185f638f Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sun, 27 Dec 2020 16:36:15 +0100 Subject: [PATCH 8/9] Update links to docs.rs --- tokio/src/fs/read_dir.rs | 2 +- tokio/src/io/util/lines.rs | 2 +- tokio/src/io/util/split.rs | 2 +- tokio/src/net/tcp/listener.rs | 2 +- tokio/src/net/unix/listener.rs | 2 +- tokio/src/sync/mpsc/bounded.rs | 2 +- tokio/src/sync/mpsc/unbounded.rs | 2 +- tokio/src/time/interval.rs | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs index 41aef0c37bb..aedaf7b921e 100644 --- a/tokio/src/fs/read_dir.rs +++ b/tokio/src/fs/read_dir.rs @@ -28,7 +28,7 @@ pub async fn read_dir(path: impl AsRef) -> io::Result { /// /// A `ReadDir` can be turned into a `Stream` with [`ReadDirStream`]. /// -/// [`ReadDirStream`]: ../../tokio_stream/wrappers/struct.ReadDirStream.html +/// [`ReadDirStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReadDirStream.html /// /// # Errors /// diff --git a/tokio/src/io/util/lines.rs b/tokio/src/io/util/lines.rs index 558acc0779c..ed6a944409e 100644 --- a/tokio/src/io/util/lines.rs +++ b/tokio/src/io/util/lines.rs @@ -15,7 +15,7 @@ pin_project! { /// This type is usually created using the [`lines`] method. /// /// [`AsyncBufRead`]: crate::io::AsyncBufRead - /// [`LinesStream`]: ../../tokio_stream/wrappers/struct.LinesStream.html + /// [`LinesStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.LinesStream.html /// [`lines`]: crate::io::AsyncBufReadExt::lines #[derive(Debug)] #[must_use = "streams do nothing unless polled"] diff --git a/tokio/src/io/util/split.rs b/tokio/src/io/util/split.rs index 3c4d90fc4f8..4f3ce4eff02 100644 --- a/tokio/src/io/util/split.rs +++ b/tokio/src/io/util/split.rs @@ -12,7 +12,7 @@ pin_project! { /// /// A `Split` can be turned into a `Stream` with [`SplitStream`]. /// - /// [`SplitStream`]: ../../tokio_stream/wrappers/struct.SplitStream.html + /// [`SplitStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.SplitStream.html #[derive(Debug)] #[must_use = "streams do nothing unless polled"] #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index 4543b806a49..a13f1d34258 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -16,7 +16,7 @@ cfg_net! { /// /// A `TcpListener` can be turned into a `Stream` with [`TcpListenerStream`]. /// - /// [`TcpListenerStream`]: ../../tokio_stream/wrappers/struct.TcpListenerStream.html + /// [`TcpListenerStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.TcpListenerStream.html /// /// # Errors /// diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 51ffe5ccf20..d1c063e729f 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -16,7 +16,7 @@ cfg_net_unix! { /// /// A `UnixListener` can be turned into a `Stream` with [`UnixListenerStream`]. /// - /// [`UnixListenerStream`]: ../../tokio_stream/wrappers/struct.UnixListenerStream.html + /// [`UnixListenerStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnixListenerStream.html /// /// # Errors /// diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 12b910f7602..985167ec565 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -36,7 +36,7 @@ pub struct Permit<'a, T> { /// /// This receiver can be turned into a `Stream` using [`ReceiverStream`]. /// -/// [`ReceiverStream`]: ../../../tokio_stream/wrappers/struct.ReceiverStream.html +/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html pub struct Receiver { /// The channel receiver chan: chan::Rx, diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 688d5052f8b..29a0a29719e 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -36,7 +36,7 @@ impl fmt::Debug for UnboundedSender { /// /// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`]. /// -/// [`UnboundedReceiverStream`]: ../../../tokio_stream/wrappers/struct.UnboundedReceiverStream.html +/// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html pub struct UnboundedReceiver { /// The channel receiver chan: chan::Rx, diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index 874f437fab5..20cfceccce3 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -115,7 +115,7 @@ pub fn interval_at(start: Instant, period: Duration) -> Interval { /// /// An `Interval` can be turned into a `Stream` with [`IntervalStream`]. /// -/// [`IntervalStream`]: ../../tokio_stream/wrappers/struct.IntervalStream.html +/// [`IntervalStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.IntervalStream.html #[derive(Debug)] pub struct Interval { /// Future that completes the next time the `Interval` yields a value. From 47e10ecebc4bda6a35ad32a85b73a1028643f30e Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 4 Jan 2021 17:35:42 +0100 Subject: [PATCH 9/9] add as_pin_mut for non Unpin wrappers --- tokio-stream/src/wrappers/lines.rs | 5 +++++ tokio-stream/src/wrappers/split.rs | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/tokio-stream/src/wrappers/lines.rs b/tokio-stream/src/wrappers/lines.rs index abf91d62ea9..4850429a72d 100644 --- a/tokio-stream/src/wrappers/lines.rs +++ b/tokio-stream/src/wrappers/lines.rs @@ -28,6 +28,11 @@ impl LinesStream { pub fn into_inner(self) -> Lines { self.inner } + + /// Obtain a pinned reference to the inner `Lines`. + pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Lines> { + self.project().inner + } } impl Stream for LinesStream { diff --git a/tokio-stream/src/wrappers/split.rs b/tokio-stream/src/wrappers/split.rs index 57f84b2fd7c..ac46a8ba6ff 100644 --- a/tokio-stream/src/wrappers/split.rs +++ b/tokio-stream/src/wrappers/split.rs @@ -28,6 +28,11 @@ impl SplitStream { pub fn into_inner(self) -> Split { self.inner } + + /// Obtain a pinned reference to the inner `Split`. + pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Split> { + self.project().inner + } } impl Stream for SplitStream {