diff --git a/futures-channel/src/oneshot.rs b/futures-channel/src/oneshot.rs index 8d160066fb..47d3ada853 100644 --- a/futures-channel/src/oneshot.rs +++ b/futures-channel/src/oneshot.rs @@ -1,4 +1,6 @@ //! A channel for sending a single message between asynchronous tasks. +//! +//! This is a single-producer, single-consumer channel. use alloc::sync::Arc; use core::fmt; @@ -12,7 +14,7 @@ use crate::lock::Lock; /// A future for a value that will be provided by another asynchronous task. /// -/// This is created by the [`channel`] function. +/// This is created by the [`channel`](channel) function. #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct Receiver { @@ -21,7 +23,7 @@ pub struct Receiver { /// A means of transmitting a single value to another task. /// -/// This is created by the [`channel`] function. +/// This is created by the [`channel`](channel) function. #[derive(Debug)] pub struct Sender { inner: Arc>, @@ -68,7 +70,9 @@ struct Inner { tx_task: Lock>, } -/// Creates a new one-shot channel for sending values across asynchronous tasks. +/// Creates a new one-shot channel for sending a single value across asynchronous tasks. +/// +/// The channel works for a spsc (single-producer, single-consumer) scheme. /// /// This function is similar to Rust's channel constructor found in the standard /// library. Two halves are returned, the first of which is a `Sender` handle, @@ -337,14 +341,13 @@ impl Sender { /// /// If the value is successfully enqueued for the remote end to receive, /// then `Ok(())` is returned. If the receiving end was dropped before - /// this function was called, however, then `Err` is returned with the value - /// provided. + /// this function was called, however, then `Err(t)` is returned. pub fn send(self, t: T) -> Result<(), T> { self.inner.send(t) } /// Polls this `Sender` half to detect whether its associated - /// [`Receiver`](Receiver) with has been dropped. + /// [`Receiver`](Receiver) has been dropped. /// /// # Return values /// diff --git a/futures-util/src/future/future/mod.rs b/futures-util/src/future/future/mod.rs index 9de6b0f51b..f5d5dd2667 100644 --- a/futures-util/src/future/future/mod.rs +++ b/futures-util/src/future/future/mod.rs @@ -3,11 +3,14 @@ //! This module contains a number of functions for working with `Future`s, //! including the `FutureExt` trait which adds methods to `Future` types. -use super::{assert_future, Either}; #[cfg(feature = "alloc")] use alloc::boxed::Box; use core::pin::Pin; +use crate::future::{assert_future, Either}; +use crate::stream::assert_stream; +use crate::fns::{inspect_fn, into_fn, ok_fn, InspectFn, IntoFn, OkFn}; +use crate::never::Never; #[cfg(feature = "alloc")] use futures_core::future::{BoxFuture, LocalBoxFuture}; use futures_core::{ @@ -15,8 +18,6 @@ use futures_core::{ stream::Stream, task::{Context, Poll}, }; -use crate::never::Never; -use crate::fns::{OkFn, ok_fn, IntoFn, into_fn, InspectFn, inspect_fn}; use pin_utils::pin_mut; // Combinators @@ -223,7 +224,7 @@ pub trait FutureExt: Future { B: Future, Self: Sized, { - Either::Left(self) + assert_future::(Either::Left(self)) } /// Wrap this future in an `Either` future, making it the right-hand variant @@ -253,7 +254,7 @@ pub trait FutureExt: Future { A: Future, Self: Sized, { - Either::Right(self) + assert_future::(Either::Right(self)) } /// Convert this future into a single element stream. @@ -278,7 +279,7 @@ pub trait FutureExt: Future { where Self: Sized, { - IntoStream::new(self) + assert_stream::(IntoStream::new(self)) } /// Flatten the execution of this future when the output of this @@ -342,7 +343,7 @@ pub trait FutureExt: Future { Self::Output: Stream, Self: Sized, { - FlattenStream::new(self) + assert_stream::<::Item, _>(FlattenStream::new(self)) } /// Fuse a future such that `poll` will never again be called once it has @@ -431,7 +432,9 @@ pub trait FutureExt: Future { where Self: Sized + ::std::panic::UnwindSafe, { - CatchUnwind::new(self) + assert_future::>, _>(CatchUnwind::new( + self, + )) } /// Create a cloneable handle to this future where all handles will resolve @@ -485,7 +488,7 @@ pub trait FutureExt: Future { Self: Sized, Self::Output: Clone, { - Shared::new(self) + assert_future::(Shared::new(self)) } /// Turn this future into a future that yields `()` on completion and sends @@ -515,7 +518,7 @@ pub trait FutureExt: Future { where Self: Sized + Send + 'a, { - Box::pin(self) + assert_future::(Box::pin(self)) } /// Wrap the future in a Box, pinning it. @@ -529,7 +532,7 @@ pub trait FutureExt: Future { where Self: Sized + 'a, { - Box::pin(self) + assert_future::(Box::pin(self)) } /// Turns a [`Future`](Future) into a @@ -538,7 +541,7 @@ pub trait FutureExt: Future { where Self: Sized, { - UnitError::new(self) + assert_future::, _>(UnitError::new(self)) } /// Turns a [`Future`](Future) into a @@ -547,7 +550,7 @@ pub trait FutureExt: Future { where Self: Sized, { - NeverError::new(self) + assert_future::, _>(NeverError::new(self)) } /// A convenience for calling `Future::poll` on `Unpin` future types. diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index f957c54089..3f19c195ed 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -109,7 +109,7 @@ cfg_target_has_atomic! { // Just a helper function to ensure the futures we're returning all have the // right implementations. -fn assert_future(future: F) -> F +pub(crate) fn assert_future(future: F) -> F where F: Future, { diff --git a/futures-util/src/future/try_future/mod.rs b/futures-util/src/future/try_future/mod.rs index 0732b91da9..1ce01d2de0 100644 --- a/futures-util/src/future/try_future/mod.rs +++ b/futures-util/src/future/try_future/mod.rs @@ -14,13 +14,13 @@ use futures_core::{ #[cfg(feature = "sink")] use futures_sink::Sink; -use super::assert_future; -use crate::future::{Map, Inspect}; use crate::fns::{ - MapOkFn, map_ok_fn, MapErrFn, map_err_fn, MapOkOrElseFn, - map_ok_or_else_fn, IntoFn, UnwrapOrElseFn, unwrap_or_else_fn, InspectOkFn, inspect_ok_fn, InspectErrFn, - inspect_err_fn, into_fn + inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, map_ok_or_else_fn, + unwrap_or_else_fn, InspectErrFn, InspectOkFn, IntoFn, MapErrFn, MapOkFn, MapOkOrElseFn, + UnwrapOrElseFn, }; +use crate::future::{assert_future, Inspect, Map}; +use crate::stream::assert_stream; // Combinators mod into_future; @@ -230,7 +230,7 @@ pub trait TryFutureExt: TryFuture { /// The provided closure `f` will only be called if this future is resolved /// to an [`Ok`]. If it resolves to an [`Err`], panics, or is dropped, then /// the provided closure will never be invoked. - /// + /// /// The provided closure `e` will only be called if this future is resolved /// to an [`Err`]. If it resolves to an [`Ok`], panics, or is dropped, then /// the provided closure will never be invoked. @@ -247,13 +247,13 @@ pub trait TryFutureExt: TryFuture { /// let future = async { Ok::(5) }; /// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3); /// assert_eq!(future.await, 8); - /// + /// /// let future = async { Err::(5) }; /// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3); /// assert_eq!(future.await, 10); /// # }); /// ``` - /// + /// fn map_ok_or_else(self, e: E, f: F) -> MapOkOrElse where F: FnOnce(Self::Ok) -> T, @@ -534,7 +534,9 @@ pub trait TryFutureExt: TryFuture { Self::Ok: TryStream, Self: Sized, { - TryFlattenStream::new(self) + assert_stream::::Ok, Self::Error>, _>(TryFlattenStream::new( + self, + )) } /// Unwraps this future's ouput, producing a future with this future's @@ -603,7 +605,7 @@ pub trait TryFutureExt: TryFuture { where Self: Sized, { - IntoFuture::new(self) + assert_future::, _>(IntoFuture::new(self)) } /// A convenience method for calling [`TryFuture::try_poll`] on [`Unpin`] diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index b95edd51be..ca9bc89ace 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -100,3 +100,12 @@ cfg_target_has_atomic! { #[cfg(feature = "alloc")] pub use self::select_all::{select_all, SelectAll}; } + +// Just a helper function to ensure the futures we're returning all have the +// right implementations. +pub(crate) fn assert_stream(stream: S) -> S + where + S: Stream, +{ + stream +} diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index fe702a6d2b..f988468f0b 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -3,7 +3,7 @@ //! This module contains a number of functions for working with `Stream`s, //! including the `StreamExt` trait which adds methods to `Stream` types. -use crate::future::Either; +use crate::future::{assert_future, Either}; #[cfg(feature = "alloc")] use alloc::boxed::Box; use core::pin::Pin; @@ -193,6 +193,7 @@ mod catch_unwind; #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::catch_unwind::CatchUnwind; +use crate::stream::assert_stream; impl StreamExt for T where T: Stream {} @@ -226,7 +227,7 @@ pub trait StreamExt: Stream { where Self: Unpin, { - Next::new(self) + assert_future::, _>(Next::new(self)) } /// Converts this stream into a future of `(next_item, tail_of_stream)`. @@ -261,7 +262,7 @@ pub trait StreamExt: Stream { where Self: Sized + Unpin, { - StreamFuture::new(self) + assert_future::<(Option, Self), _>(StreamFuture::new(self)) } /// Maps this stream's items to a different type, returning a new stream of @@ -292,7 +293,7 @@ pub trait StreamExt: Stream { F: FnMut(Self::Item) -> T, Self: Sized, { - Map::new(self, f) + assert_stream::(Map::new(self, f)) } /// Creates a stream which gives the current iteration count as well as @@ -337,7 +338,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Enumerate::new(self) + assert_stream::<(usize, Self::Item), _>(Enumerate::new(self)) } /// Filters the values produced by this stream according to the provided @@ -372,7 +373,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - Filter::new(self, f) + assert_stream::(Filter::new(self, f)) } /// Filters the values produced by this stream while simultaneously mapping @@ -406,7 +407,7 @@ pub trait StreamExt: Stream { Fut: Future>, Self: Sized, { - FilterMap::new(self, f) + assert_stream::(FilterMap::new(self, f)) } /// Computes from this stream's items new items of a different type using @@ -437,7 +438,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - Then::new(self, f) + assert_stream::(Then::new(self, f)) } /// Transforms a stream into a collection, returning a @@ -469,7 +470,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Collect::new(self) + assert_future::(Collect::new(self)) } /// Concatenate all items of a stream into a single extendable @@ -509,7 +510,7 @@ pub trait StreamExt: Stream { Self: Sized, Self::Item: Extend<<::Item as IntoIterator>::Item> + IntoIterator + Default, { - Concat::new(self) + assert_future::(Concat::new(self)) } /// Execute an accumulating asynchronous computation over a stream, @@ -538,7 +539,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - Fold::new(self, f, init) + assert_future::(Fold::new(self, f, init)) } /// Flattens a stream of streams into just one continuous stream. @@ -577,7 +578,7 @@ pub trait StreamExt: Stream { Self::Item: Stream, Self: Sized, { - Flatten::new(self) + assert_stream::<::Item, _>(Flatten::new(self)) } /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. @@ -675,7 +676,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - SkipWhile::new(self, f) + assert_stream::(SkipWhile::new(self, f)) } /// Take elements from this stream while the provided asynchronous predicate @@ -705,7 +706,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - TakeWhile::new(self, f) + assert_stream::(TakeWhile::new(self, f)) } /// Take elements from this stream until the provided future resolves. @@ -791,7 +792,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - ForEach::new(self, f) + assert_future::<(), _>(ForEach::new(self, f)) } /// Runs this stream to completion, executing the provided asynchronous @@ -851,7 +852,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - ForEachConcurrent::new(self, limit.into(), f) + assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f)) } /// Creates a new stream of at most `n` items of the underlying stream. @@ -874,7 +875,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Take::new(self, n) + assert_stream::(Take::new(self, n)) } /// Creates a new stream which skips `n` items of the underlying stream. @@ -897,7 +898,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Skip::new(self, n) + assert_stream::(Skip::new(self, n)) } /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never @@ -943,7 +944,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Fuse::new(self) + assert_stream::(Fuse::new(self)) } /// Borrows a stream, rather than consuming it. @@ -1021,7 +1022,7 @@ pub trait StreamExt: Stream { where Self: Sized + std::panic::UnwindSafe, { - CatchUnwind::new(self) + assert_stream(CatchUnwind::new(self)) } /// Wrap the stream in a Box, pinning it. @@ -1033,7 +1034,7 @@ pub trait StreamExt: Stream { where Self: Sized + Send + 'a, { - Box::pin(self) + assert_stream::(Box::pin(self)) } /// Wrap the stream in a Box, pinning it. @@ -1047,7 +1048,7 @@ pub trait StreamExt: Stream { where Self: Sized + 'a, { - Box::pin(self) + assert_stream::(Box::pin(self)) } /// An adaptor for creating a buffered list of pending futures. @@ -1069,7 +1070,7 @@ pub trait StreamExt: Stream { Self::Item: Future, Self: Sized, { - Buffered::new(self, n) + assert_stream::<::Output, _>(Buffered::new(self, n)) } /// An adaptor for creating a buffered list of pending futures (unordered). @@ -1114,7 +1115,7 @@ pub trait StreamExt: Stream { Self::Item: Future, Self: Sized, { - BufferUnordered::new(self, n) + assert_stream::<::Output, _>(BufferUnordered::new(self, n)) } /// An adapter for zipping two streams together. @@ -1144,7 +1145,7 @@ pub trait StreamExt: Stream { St: Stream, Self: Sized, { - Zip::new(self, other) + assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other)) } /// Adapter for chaining two streams. @@ -1175,7 +1176,7 @@ pub trait StreamExt: Stream { St: Stream, Self: Sized, { - Chain::new(self, other) + assert_stream::(Chain::new(self, other)) } /// Creates a new stream which exposes a `peek` method. @@ -1185,7 +1186,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Peekable::new(self) + assert_stream::(Peekable::new(self)) } /// An adaptor for chunking up items of the stream inside a vector. @@ -1211,7 +1212,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Chunks::new(self, capacity) + assert_stream::, _>(Chunks::new(self, capacity)) } /// An adaptor for chunking up ready items of the stream inside a vector. @@ -1277,7 +1278,8 @@ pub trait StreamExt: Stream { where Self: Sink + Sized, { - split::split(self) + let (sink, stream) = split::split(self); + (sink, assert_stream::(stream)) } /// Do something with each item of this stream, afterwards passing it on. @@ -1290,7 +1292,7 @@ pub trait StreamExt: Stream { F: FnMut(&Self::Item), Self: Sized, { - Inspect::new(self, f) + assert_stream::(Inspect::new(self, f)) } /// Wrap this stream in an `Either` stream, making it the left-hand variant @@ -1303,7 +1305,7 @@ pub trait StreamExt: Stream { B: Stream, Self: Sized, { - Either::Left(self) + assert_stream::(Either::Left(self)) } /// Wrap this stream in an `Either` stream, making it the right-hand variant @@ -1316,7 +1318,7 @@ pub trait StreamExt: Stream { B: Stream, Self: Sized, { - Either::Right(self) + assert_stream::(Either::Right(self)) } /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`] diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 4087298b7b..0c8e10840f 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -129,6 +129,8 @@ mod into_async_read; #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::into_async_read::IntoAsyncRead; +use crate::future::assert_future; +use crate::stream::assert_stream; impl TryStreamExt for S {} @@ -156,7 +158,7 @@ pub trait TryStreamExt: TryStream { Self: Sized, Self::Error: Into, { - ErrInto::new(self) + assert_stream::, _>(ErrInto::new(self)) } /// Wraps the current stream in a new stream which maps the success value @@ -181,7 +183,7 @@ pub trait TryStreamExt: TryStream { Self: Sized, F: FnMut(Self::Ok) -> T, { - MapOk::new(self, f) + assert_stream::, _>(MapOk::new(self, f)) } /// Wraps the current stream in a new stream which maps the error value @@ -206,7 +208,7 @@ pub trait TryStreamExt: TryStream { Self: Sized, F: FnMut(Self::Error) -> E, { - MapErr::new(self, f) + assert_stream::, _>(MapErr::new(self, f)) } /// Chain on a computation for when a value is ready, passing the successful @@ -253,7 +255,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture, Self: Sized, { - AndThen::new(self, f) + assert_stream::, _>(AndThen::new(self, f)) } /// Chain on a computation for when an error happens, passing the @@ -279,7 +281,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture, Self: Sized, { - OrElse::new(self, f) + assert_stream::, _>(OrElse::new(self, f)) } /// Do something with the success value of this stream, afterwards passing @@ -293,7 +295,7 @@ pub trait TryStreamExt: TryStream { F: FnMut(&Self::Ok), Self: Sized, { - InspectOk::new(self, f) + assert_stream::, _>(InspectOk::new(self, f)) } /// Do something with the error value of this stream, afterwards passing it on. @@ -306,7 +308,7 @@ pub trait TryStreamExt: TryStream { F: FnMut(&Self::Error), Self: Sized, { - InspectErr::new(self, f) + assert_stream::, _>(InspectErr::new(self, f)) } /// Wraps a [`TryStream`] into a type that implements @@ -334,7 +336,7 @@ pub trait TryStreamExt: TryStream { where Self: Sized, { - IntoStream::new(self) + assert_stream::, _>(IntoStream::new(self)) } /// Creates a future that attempts to resolve the next item in the stream. @@ -361,7 +363,7 @@ pub trait TryStreamExt: TryStream { where Self: Unpin, { - TryNext::new(self) + assert_future::, Self::Error>, _>(TryNext::new(self)) } /// Attempts to run this stream to completion, executing the provided @@ -403,7 +405,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture, Self: Sized, { - TryForEach::new(self, f) + assert_future::, _>(TryForEach::new(self, f)) } /// Skip elements on this stream while the provided asynchronous predicate @@ -433,7 +435,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture, Self: Sized, { - TrySkipWhile::new(self, f) + assert_stream::, _>(TrySkipWhile::new(self, f)) } /// Take elements on this stream while the provided asynchronous predicate @@ -519,7 +521,11 @@ pub trait TryStreamExt: TryStream { Fut: Future>, Self: Sized, { - TryForEachConcurrent::new(self, limit.into(), f) + assert_future::, _>(TryForEachConcurrent::new( + self, + limit.into(), + f, + )) } /// Attempt to transform a stream into a collection, @@ -556,7 +562,7 @@ pub trait TryStreamExt: TryStream { where Self: Sized, { - TryCollect::new(self) + assert_future::, _>(TryCollect::new(self)) } /// Attempt to filter the values produced by this stream according to the @@ -595,7 +601,7 @@ pub trait TryStreamExt: TryStream { F: FnMut(&Self::Ok) -> Fut, Self: Sized, { - TryFilter::new(self, f) + assert_stream::, _>(TryFilter::new(self, f)) } /// Attempt to filter the values produced by this stream while @@ -636,7 +642,7 @@ pub trait TryStreamExt: TryStream { F: FnMut(Self::Ok) -> Fut, Self: Sized, { - TryFilterMap::new(self, f) + assert_stream::, _>(TryFilterMap::new(self, f)) } /// Flattens a stream of streams into just one continuous stream. @@ -683,7 +689,9 @@ pub trait TryStreamExt: TryStream { ::Error: From, Self: Sized, { - TryFlatten::new(self) + assert_stream::::Ok, ::Error>, _>( + TryFlatten::new(self), + ) } /// Attempt to execute an accumulating asynchronous computation over a @@ -720,7 +728,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture, Self: Sized, { - TryFold::new(self, f, init) + assert_future::, _>(TryFold::new(self, f, init)) } /// Attempt to concatenate all items of a stream into a single @@ -762,7 +770,7 @@ pub trait TryStreamExt: TryStream { Self: Sized, Self::Ok: Extend<<::Ok as IntoIterator>::Item> + IntoIterator + Default, { - TryConcat::new(self) + assert_future::, _>(TryConcat::new(self)) } /// Attempt to execute several futures from a stream concurrently. @@ -829,7 +837,9 @@ pub trait TryStreamExt: TryStream { Self::Ok: TryFuture, Self: Sized, { - TryBufferUnordered::new(self, n) + assert_stream::::Ok, Self::Error>, _>( + TryBufferUnordered::new(self, n), + ) } // TODO: false positive warning from rustdoc. Verify once #43466 settles