From 9cc215e1bdde0f0cc4fa303b7b61c8e491dd7637 Mon Sep 17 00:00:00 2001 From: Stepan Koltsov Date: Fri, 3 Jan 2020 05:28:02 +0000 Subject: [PATCH 1/2] Use more assert_{future,stream} functions Use `assert_future` and `assert_stream` annotations in almost all `FutureExt`, `TryFutureExt`, `StreamExt` and `TryStreamExt` trait fns. My own style of working with third-party code is doing Cmd-Click in Idea and look at signature/implementation. I found these existing `assert_` annotations very helpful in understanding what this particular function does, what it expects, what it returns, much faster than reading documentation or opening actual future/stream implementation. For example, there's `TryStreamExt::and_then` function: ``` fn and_then(self, f: F) -> AndThen where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture, Self: Sized, { ... } ``` What does it do? Does it return a future or a stream? Is stream item original stream item or returned from future? Is error converted? Is it even `Try` object returned from this function? Now with the function body: ``` assert_stream::, _>(...) ``` it's easier to infer that function returns a stream with the same items as provided future. So this diff extends `assert_` function to almost all functions of these four traits. --- futures-util/src/future/future/mod.rs | 24 +++++---- futures-util/src/future/mod.rs | 2 +- futures-util/src/future/try_future/mod.rs | 24 +++++---- futures-util/src/stream/mod.rs | 9 ++++ futures-util/src/stream/stream/mod.rs | 66 ++++++++++++----------- futures-util/src/stream/try_stream/mod.rs | 48 ++++++++++------- 6 files changed, 101 insertions(+), 72 deletions(-) diff --git a/futures-util/src/future/future/mod.rs b/futures-util/src/future/future/mod.rs index e58cafc8c0..5d8fc75e97 100644 --- a/futures-util/src/future/future/mod.rs +++ b/futures-util/src/future/future/mod.rs @@ -77,6 +77,8 @@ pub use self::shared::Shared; mod chain; pub(crate) use self::chain::Chain; +use crate::never::Never; +use crate::stream::assert_stream; impl FutureExt for T where T: Future {} @@ -173,7 +175,7 @@ pub trait FutureExt: Future { B: Future, Self: Sized, { - Either::Left(self) + assert_future::(Either::Left(self)) } /// Wrap this future in an `Either` future, making it the right-hand variant @@ -203,7 +205,7 @@ pub trait FutureExt: Future { A: Future, Self: Sized, { - Either::Right(self) + assert_future::(Either::Right(self)) } /// Convert this future into a single element stream. @@ -228,7 +230,7 @@ pub trait FutureExt: Future { where Self: Sized, { - IntoStream::new(self) + assert_stream::(IntoStream::new(self)) } /// Flatten the execution of this future when the output of this @@ -292,7 +294,7 @@ pub trait FutureExt: Future { Self::Output: Stream, Self: Sized, { - FlattenStream::new(self) + assert_stream::<::Item, _>(FlattenStream::new(self)) } /// Fuse a future such that `poll` will never again be called once it has @@ -381,7 +383,9 @@ pub trait FutureExt: Future { where Self: Sized + ::std::panic::UnwindSafe, { - CatchUnwind::new(self) + assert_future::>, _>(CatchUnwind::new( + self, + )) } /// Create a cloneable handle to this future where all handles will resolve @@ -435,7 +439,7 @@ pub trait FutureExt: Future { Self: Sized, Self::Output: Clone, { - Shared::new(self) + assert_future::(Shared::new(self)) } /// Turn this future into a future that yields `()` on completion and sends @@ -464,7 +468,7 @@ pub trait FutureExt: Future { where Self: Sized + Send + 'a, { - Box::pin(self) + assert_future::(Box::pin(self)) } /// Wrap the future in a Box, pinning it. @@ -478,7 +482,7 @@ pub trait FutureExt: Future { where Self: Sized + 'a, { - Box::pin(self) + assert_future::(Box::pin(self)) } /// Turns a [`Future`](Future) into a @@ -487,7 +491,7 @@ pub trait FutureExt: Future { where Self: Sized, { - UnitError::new(self) + assert_future::, _>(UnitError::new(self)) } /// Turns a [`Future`](Future) into a @@ -496,7 +500,7 @@ pub trait FutureExt: Future { where Self: Sized, { - NeverError::new(self) + assert_future::, _>(NeverError::new(self)) } /// A convenience for calling `Future::poll` on `Unpin` future types. diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 3f4bb01436..d77c9e7f2a 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -102,7 +102,7 @@ cfg_target_has_atomic! { // Just a helper function to ensure the futures we're returning all have the // right implementations. -fn assert_future(future: F) -> F +pub(crate) fn assert_future(future: F) -> F where F: Future, { diff --git a/futures-util/src/future/try_future/mod.rs b/futures-util/src/future/try_future/mod.rs index e488b403c5..b0ca5549db 100644 --- a/futures-util/src/future/try_future/mod.rs +++ b/futures-util/src/future/try_future/mod.rs @@ -69,6 +69,8 @@ pub(crate) use self::flatten_stream_sink::FlattenStreamSink; mod try_chain; pub(crate) use self::try_chain::{TryChain, TryChainAction}; +use crate::future::assert_future; +use crate::stream::assert_stream; impl TryFutureExt for Fut {} @@ -157,7 +159,7 @@ pub trait TryFutureExt: TryFuture { F: FnOnce(Self::Ok) -> T, Self: Sized, { - MapOk::new(self, f) + assert_future::, _>(MapOk::new(self, f)) } /// Maps this future's error value to a different value. @@ -204,7 +206,7 @@ pub trait TryFutureExt: TryFuture { F: FnOnce(Self::Error) -> E, Self: Sized, { - MapErr::new(self, f) + assert_future::, _>(MapErr::new(self, f)) } /// Maps this future's [`Error`](TryFuture::Error) to a new error type @@ -234,7 +236,7 @@ pub trait TryFutureExt: TryFuture { Self: Sized, Self::Error: Into, { - ErrInto::new(self) + assert_future::, _>(ErrInto::new(self)) } /// Executes another future after this one resolves successfully. The @@ -279,7 +281,7 @@ pub trait TryFutureExt: TryFuture { Fut: TryFuture, Self: Sized, { - AndThen::new(self, f) + assert_future::, _>(AndThen::new(self, f)) } /// Executes another future if this one resolves to an error. The @@ -324,7 +326,7 @@ pub trait TryFutureExt: TryFuture { Fut: TryFuture, Self: Sized, { - OrElse::new(self, f) + assert_future::, _>(OrElse::new(self, f)) } /// Do something with the success value of a future before passing it on. @@ -350,7 +352,7 @@ pub trait TryFutureExt: TryFuture { F: FnOnce(&Self::Ok), Self: Sized, { - InspectOk::new(self, f) + assert_future::, _>(InspectOk::new(self, f)) } /// Do something with the error value of a future before passing it on. @@ -376,7 +378,7 @@ pub trait TryFutureExt: TryFuture { F: FnOnce(&Self::Error), Self: Sized, { - InspectErr::new(self, f) + assert_future::, _>(InspectErr::new(self, f)) } /// Flatten the execution of this future when the successful result of this @@ -409,7 +411,9 @@ pub trait TryFutureExt: TryFuture { Self::Ok: TryStream, Self: Sized, { - TryFlattenStream::new(self) + assert_stream::::Ok, Self::Error>, _>(TryFlattenStream::new( + self, + )) } /// Unwraps this future's ouput, producing a future with this future's @@ -439,7 +443,7 @@ pub trait TryFutureExt: TryFuture { Self: Sized, F: FnOnce(Self::Error) -> Self::Ok, { - UnwrapOrElse::new(self, f) + assert_future::(UnwrapOrElse::new(self, f)) } /// Wraps a [`TryFuture`] into a future compatable with libraries using @@ -477,7 +481,7 @@ pub trait TryFutureExt: TryFuture { where Self: Sized, { - IntoFuture::new(self) + assert_future::, _>(IntoFuture::new(self)) } /// A convenience method for calling [`TryFuture::try_poll`] on [`Unpin`] diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 67bb39962a..8226c76e0b 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -94,3 +94,12 @@ cfg_target_has_atomic! { #[cfg(feature = "alloc")] pub use self::select_all::{select_all, SelectAll}; } + +// Just a helper function to ensure the futures we're returning all have the +// right implementations. +pub(crate) fn assert_stream(stream: S) -> S + where + S: Stream, +{ + stream +} diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 62da0f2cb2..fd2dde1806 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -3,7 +3,7 @@ //! This module contains a number of functions for working with `Stream`s, //! including the `StreamExt` trait which adds methods to `Stream` types. -use crate::future::Either; +use crate::future::{assert_future, Either}; #[cfg(feature = "alloc")] use alloc::boxed::Box; use core::pin::Pin; @@ -153,6 +153,7 @@ mod catch_unwind; #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::catch_unwind::CatchUnwind; +use crate::stream::assert_stream; impl StreamExt for T where T: Stream {} @@ -186,7 +187,7 @@ pub trait StreamExt: Stream { where Self: Unpin, { - Next::new(self) + assert_future::, _>(Next::new(self)) } /// Converts this stream into a future of `(next_item, tail_of_stream)`. @@ -221,7 +222,7 @@ pub trait StreamExt: Stream { where Self: Sized + Unpin, { - StreamFuture::new(self) + assert_future::<(Option, Self), _>(StreamFuture::new(self)) } /// Maps this stream's items to a different type, returning a new stream of @@ -252,7 +253,7 @@ pub trait StreamExt: Stream { F: FnMut(Self::Item) -> T, Self: Sized, { - Map::new(self, f) + assert_stream::(Map::new(self, f)) } /// Creates a stream which gives the current iteration count as well as @@ -297,7 +298,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Enumerate::new(self) + assert_stream::<(usize, Self::Item), _>(Enumerate::new(self)) } /// Filters the values produced by this stream according to the provided @@ -332,7 +333,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - Filter::new(self, f) + assert_stream::(Filter::new(self, f)) } /// Filters the values produced by this stream while simultaneously mapping @@ -366,7 +367,7 @@ pub trait StreamExt: Stream { Fut: Future>, Self: Sized, { - FilterMap::new(self, f) + assert_stream::(FilterMap::new(self, f)) } /// Computes from this stream's items new items of a different type using @@ -397,7 +398,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - Then::new(self, f) + assert_stream::(Then::new(self, f)) } /// Transforms a stream into a collection, returning a @@ -429,7 +430,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Collect::new(self) + assert_future::(Collect::new(self)) } /// Concatenate all items of a stream into a single extendable @@ -469,7 +470,7 @@ pub trait StreamExt: Stream { Self: Sized, Self::Item: Extend<<::Item as IntoIterator>::Item> + IntoIterator + Default, { - Concat::new(self) + assert_future::(Concat::new(self)) } /// Execute an accumulating asynchronous computation over a stream, @@ -498,7 +499,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - Fold::new(self, f, init) + assert_future::(Fold::new(self, f, init)) } /// Flattens a stream of streams into just one continuous stream. @@ -537,7 +538,7 @@ pub trait StreamExt: Stream { Self::Item: Stream, Self: Sized, { - Flatten::new(self) + assert_stream::<::Item, _>(Flatten::new(self)) } /// Skip elements on this stream while the provided asynchronous predicate @@ -568,7 +569,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - SkipWhile::new(self, f) + assert_stream::(SkipWhile::new(self, f)) } /// Take elements from this stream while the provided asynchronous predicate @@ -598,7 +599,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - TakeWhile::new(self, f) + assert_stream::(TakeWhile::new(self, f)) } /// Runs this stream to completion, executing the provided asynchronous @@ -640,7 +641,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - ForEach::new(self, f) + assert_future::<(), _>(ForEach::new(self, f)) } /// Runs this stream to completion, executing the provided asynchronous @@ -700,7 +701,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - ForEachConcurrent::new(self, limit.into(), f) + assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f)) } /// Creates a new stream of at most `n` items of the underlying stream. @@ -723,7 +724,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Take::new(self, n) + assert_stream::(Take::new(self, n)) } /// Creates a new stream which skips `n` items of the underlying stream. @@ -746,7 +747,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Skip::new(self, n) + assert_stream::(Skip::new(self, n)) } /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never @@ -792,7 +793,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Fuse::new(self) + assert_stream::(Fuse::new(self)) } /// Borrows a stream, rather than consuming it. @@ -870,7 +871,7 @@ pub trait StreamExt: Stream { where Self: Sized + std::panic::UnwindSafe, { - CatchUnwind::new(self) + assert_stream(CatchUnwind::new(self)) } /// Wrap the stream in a Box, pinning it. @@ -882,7 +883,7 @@ pub trait StreamExt: Stream { where Self: Sized + Send + 'a, { - Box::pin(self) + assert_stream::(Box::pin(self)) } /// Wrap the stream in a Box, pinning it. @@ -896,7 +897,7 @@ pub trait StreamExt: Stream { where Self: Sized + 'a, { - Box::pin(self) + assert_stream::(Box::pin(self)) } /// An adaptor for creating a buffered list of pending futures. @@ -918,7 +919,7 @@ pub trait StreamExt: Stream { Self::Item: Future, Self: Sized, { - Buffered::new(self, n) + assert_stream::<::Output, _>(Buffered::new(self, n)) } /// An adaptor for creating a buffered list of pending futures (unordered). @@ -963,7 +964,7 @@ pub trait StreamExt: Stream { Self::Item: Future, Self: Sized, { - BufferUnordered::new(self, n) + assert_stream::<::Output, _>(BufferUnordered::new(self, n)) } /// An adapter for zipping two streams together. @@ -993,7 +994,7 @@ pub trait StreamExt: Stream { St: Stream, Self: Sized, { - Zip::new(self, other) + assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other)) } /// Adapter for chaining two streams. @@ -1024,7 +1025,7 @@ pub trait StreamExt: Stream { St: Stream, Self: Sized, { - Chain::new(self, other) + assert_stream::(Chain::new(self, other)) } /// Creates a new stream which exposes a `peek` method. @@ -1034,7 +1035,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Peekable::new(self) + assert_stream::(Peekable::new(self)) } /// An adaptor for chunking up items of the stream inside a vector. @@ -1060,7 +1061,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Chunks::new(self, capacity) + assert_stream::, _>(Chunks::new(self, capacity)) } /// A future that completes after the given stream has been fully processed @@ -1098,7 +1099,8 @@ pub trait StreamExt: Stream { where Self: Sink + Sized, { - split::split(self) + let (sink, stream) = split::split(self); + (sink, assert_stream::(stream)) } /// Do something with each item of this stream, afterwards passing it on. @@ -1111,7 +1113,7 @@ pub trait StreamExt: Stream { F: FnMut(&Self::Item), Self: Sized, { - Inspect::new(self, f) + assert_stream::(Inspect::new(self, f)) } /// Wrap this stream in an `Either` stream, making it the left-hand variant @@ -1124,7 +1126,7 @@ pub trait StreamExt: Stream { B: Stream, Self: Sized, { - Either::Left(self) + assert_stream::(Either::Left(self)) } /// Wrap this stream in an `Either` stream, making it the right-hand variant @@ -1137,7 +1139,7 @@ pub trait StreamExt: Stream { B: Stream, Self: Sized, { - Either::Right(self) + assert_stream::(Either::Right(self)) } /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`] diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 6a7ced4f8c..b9104985bf 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -105,6 +105,8 @@ mod into_async_read; #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::into_async_read::IntoAsyncRead; +use crate::future::assert_future; +use crate::stream::assert_stream; impl TryStreamExt for S {} @@ -132,7 +134,7 @@ pub trait TryStreamExt: TryStream { Self: Sized, Self::Error: Into, { - ErrInto::new(self) + assert_stream::, _>(ErrInto::new(self)) } /// Wraps the current stream in a new stream which maps the success value @@ -157,7 +159,7 @@ pub trait TryStreamExt: TryStream { Self: Sized, F: FnMut(Self::Ok) -> T, { - MapOk::new(self, f) + assert_stream::, _>(MapOk::new(self, f)) } /// Wraps the current stream in a new stream which maps the error value @@ -182,7 +184,7 @@ pub trait TryStreamExt: TryStream { Self: Sized, F: FnMut(Self::Error) -> E, { - MapErr::new(self, f) + assert_stream::, _>(MapErr::new(self, f)) } /// Chain on a computation for when a value is ready, passing the successful @@ -229,7 +231,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture, Self: Sized, { - AndThen::new(self, f) + assert_stream::, _>(AndThen::new(self, f)) } /// Chain on a computation for when an error happens, passing the @@ -255,7 +257,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture, Self: Sized, { - OrElse::new(self, f) + assert_stream::, _>(OrElse::new(self, f)) } /// Do something with the success value of this stream, afterwards passing @@ -269,7 +271,7 @@ pub trait TryStreamExt: TryStream { F: FnMut(&Self::Ok), Self: Sized, { - InspectOk::new(self, f) + assert_stream::, _>(InspectOk::new(self, f)) } /// Do something with the error value of this stream, afterwards passing it on. @@ -282,7 +284,7 @@ pub trait TryStreamExt: TryStream { F: FnMut(&Self::Error), Self: Sized, { - InspectErr::new(self, f) + assert_stream::, _>(InspectErr::new(self, f)) } /// Wraps a [`TryStream`] into a type that implements @@ -310,7 +312,7 @@ pub trait TryStreamExt: TryStream { where Self: Sized, { - IntoStream::new(self) + assert_stream::, _>(IntoStream::new(self)) } /// Creates a future that attempts to resolve the next item in the stream. @@ -337,7 +339,7 @@ pub trait TryStreamExt: TryStream { where Self: Unpin, { - TryNext::new(self) + assert_future::, Self::Error>, _>(TryNext::new(self)) } /// Attempts to run this stream to completion, executing the provided @@ -379,7 +381,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture, Self: Sized, { - TryForEach::new(self, f) + assert_future::, _>(TryForEach::new(self, f)) } /// Skip elements on this stream while the provided asynchronous predicate @@ -408,7 +410,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture, Self: Sized, { - TrySkipWhile::new(self, f) + assert_stream::, _>(TrySkipWhile::new(self, f)) } /// Attempts to run this stream to completion, executing the provided asynchronous @@ -464,7 +466,11 @@ pub trait TryStreamExt: TryStream { Fut: Future>, Self: Sized, { - TryForEachConcurrent::new(self, limit.into(), f) + assert_future::, _>(TryForEachConcurrent::new( + self, + limit.into(), + f, + )) } /// Attempt to transform a stream into a collection, @@ -501,7 +507,7 @@ pub trait TryStreamExt: TryStream { where Self: Sized, { - TryCollect::new(self) + assert_future::, _>(TryCollect::new(self)) } /// Attempt to filter the values produced by this stream according to the @@ -540,7 +546,7 @@ pub trait TryStreamExt: TryStream { F: FnMut(&Self::Ok) -> Fut, Self: Sized, { - TryFilter::new(self, f) + assert_stream::, _>(TryFilter::new(self, f)) } /// Attempt to filter the values produced by this stream while @@ -581,7 +587,7 @@ pub trait TryStreamExt: TryStream { F: FnMut(Self::Ok) -> Fut, Self: Sized, { - TryFilterMap::new(self, f) + assert_stream::, _>(TryFilterMap::new(self, f)) } /// Flattens a stream of streams into just one continuous stream. @@ -628,7 +634,9 @@ pub trait TryStreamExt: TryStream { ::Error: From, Self: Sized, { - TryFlatten::new(self) + assert_stream::::Ok, ::Error>, _>( + TryFlatten::new(self), + ) } /// Attempt to execute an accumulating asynchronous computation over a @@ -665,7 +673,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture, Self: Sized, { - TryFold::new(self, f, init) + assert_future::, _>(TryFold::new(self, f, init)) } /// Attempt to concatenate all items of a stream into a single @@ -707,7 +715,7 @@ pub trait TryStreamExt: TryStream { Self: Sized, Self::Ok: Extend<<::Ok as IntoIterator>::Item> + IntoIterator + Default, { - TryConcat::new(self) + assert_future::, _>(TryConcat::new(self)) } /// Attempt to execute several futures from a stream concurrently. @@ -774,7 +782,9 @@ pub trait TryStreamExt: TryStream { Self::Ok: TryFuture, Self: Sized, { - TryBufferUnordered::new(self, n) + assert_stream::::Ok, Self::Error>, _>( + TryBufferUnordered::new(self, n), + ) } // TODO: false positive warning from rustdoc. Verify once #43466 settles From 2f996cd58efbea3c9595c578765b35333b25627c Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Tue, 22 Sep 2020 18:34:27 +0900 Subject: [PATCH 2/2] fix imports --- futures-util/src/future/future/mod.rs | 3 ++- futures-util/src/future/try_future/mod.rs | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/futures-util/src/future/future/mod.rs b/futures-util/src/future/future/mod.rs index ca23db5052..f5d5dd2667 100644 --- a/futures-util/src/future/future/mod.rs +++ b/futures-util/src/future/future/mod.rs @@ -3,11 +3,12 @@ //! This module contains a number of functions for working with `Future`s, //! including the `FutureExt` trait which adds methods to `Future` types. -use super::{assert_future, Either}; #[cfg(feature = "alloc")] use alloc::boxed::Box; use core::pin::Pin; +use crate::future::{assert_future, Either}; +use crate::stream::assert_stream; use crate::fns::{inspect_fn, into_fn, ok_fn, InspectFn, IntoFn, OkFn}; use crate::never::Never; #[cfg(feature = "alloc")] diff --git a/futures-util/src/future/try_future/mod.rs b/futures-util/src/future/try_future/mod.rs index 23ecaca9ef..1ce01d2de0 100644 --- a/futures-util/src/future/try_future/mod.rs +++ b/futures-util/src/future/try_future/mod.rs @@ -14,13 +14,13 @@ use futures_core::{ #[cfg(feature = "sink")] use futures_sink::Sink; -use super::assert_future; use crate::fns::{ inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, map_ok_or_else_fn, unwrap_or_else_fn, InspectErrFn, InspectOkFn, IntoFn, MapErrFn, MapOkFn, MapOkOrElseFn, UnwrapOrElseFn, }; -use crate::future::{Inspect, Map}; +use crate::future::{assert_future, Inspect, Map}; +use crate::stream::assert_stream; // Combinators mod into_future;