diff --git a/futures-core/src/future.rs b/futures-core/src/future.rs index 7540cd027e..d60c6dc9e5 100644 --- a/futures-core/src/future.rs +++ b/futures-core/src/future.rs @@ -2,7 +2,6 @@ use core::ops::DerefMut; use core::pin::Pin; -use core::task::{Context, Poll}; #[doc(no_inline)] pub use core::future::Future; @@ -20,10 +19,10 @@ pub type LocalBoxFuture<'a, T> = Pin + /// should no longer be polled. /// /// `is_terminated` will return `true` if a future should no longer be polled. -/// Usually, this state occurs after `poll` (or `try_poll`) returned -/// `Poll::Ready`. However, `is_terminated` may also return `true` if a future -/// has become inactive and can no longer make progress and should be ignored -/// or dropped rather than being `poll`ed again. +/// Usually, this state occurs after `poll` returned `Poll::Ready`. However, +/// `is_terminated` may also return `true` if a future has become inactive +/// and can no longer make progress and should be ignored or dropped rather +/// than being `poll`ed again. pub trait FusedFuture: Future { /// Returns `true` if the underlying future should no longer be polled. fn is_terminated(&self) -> bool; @@ -45,29 +44,14 @@ where } } -mod private_try_future { - use super::Future; - - pub trait Sealed {} - - impl Sealed for F where F: ?Sized + Future> {} -} - /// A convenience for futures that return `Result` values that includes /// a variety of adapters tailored to such futures. -pub trait TryFuture: Future + private_try_future::Sealed { +pub trait TryFuture: Future> { /// The type of successful values yielded by this future type Ok; /// The type of failures yielded by this future type Error; - - /// Poll this `TryFuture` as if it were a `Future`. - /// - /// This method is a stopgap for a compiler limitation that prevents us from - /// directly inheriting from the `Future` trait; in the future it won't be - /// needed. - fn try_poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; } impl TryFuture for F @@ -76,11 +60,6 @@ where { type Ok = T; type Error = E; - - #[inline] - fn try_poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.poll(cx) - } } #[cfg(feature = "alloc")] diff --git a/futures-util/src/compat/compat03as01.rs b/futures-util/src/compat/compat03as01.rs index 5d3a6e920b..5617353f26 100644 --- a/futures-util/src/compat/compat03as01.rs +++ b/futures-util/src/compat/compat03as01.rs @@ -102,7 +102,7 @@ where type Error = Fut::Error; fn poll(&mut self) -> Poll01 { - with_context(self, |inner, cx| poll_03_to_01(inner.try_poll(cx))) + with_context(self, |inner, cx| poll_03_to_01(inner.poll(cx))) } } diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index e3838b7ad2..efe21587b2 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -40,8 +40,8 @@ pub use self::future::{Shared, WeakShared}; mod try_future; pub use self::try_future::{ - AndThen, ErrInto, InspectErr, InspectOk, IntoFuture, MapErr, MapOk, MapOkOrElse, OkInto, - OrElse, TryFlatten, TryFlattenStream, TryFutureExt, UnwrapOrElse, + AndThen, ErrInto, InspectErr, InspectOk, MapErr, MapOk, MapOkOrElse, OkInto, OrElse, + TryFlatten, TryFlattenStream, TryFutureExt, UnwrapOrElse, }; #[cfg(feature = "sink")] diff --git a/futures-util/src/future/select_ok.rs b/futures-util/src/future/select_ok.rs index 5d5579930b..afb4c865f9 100644 --- a/futures-util/src/future/select_ok.rs +++ b/futures-util/src/future/select_ok.rs @@ -1,5 +1,5 @@ use super::assert_future; -use crate::future::TryFutureExt; +use crate::future::FutureExt; use alloc::vec::Vec; use core::iter::FromIterator; use core::mem; @@ -49,7 +49,7 @@ impl Future for SelectOk { // loop until we've either exhausted all errors, a success was hit, or nothing is ready loop { let item = - self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) { + self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.poll_unpin(cx) { Poll::Pending => None, Poll::Ready(e) => Some((i, e)), }); diff --git a/futures-util/src/future/try_future/into_future.rs b/futures-util/src/future/try_future/into_future.rs deleted file mode 100644 index 9f093d0e2e..0000000000 --- a/futures-util/src/future/try_future/into_future.rs +++ /dev/null @@ -1,36 +0,0 @@ -use core::pin::Pin; -use futures_core::future::{FusedFuture, Future, TryFuture}; -use futures_core::task::{Context, Poll}; -use pin_project_lite::pin_project; - -pin_project! { - /// Future for the [`into_future`](super::TryFutureExt::into_future) method. - #[derive(Debug)] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct IntoFuture { - #[pin] - future: Fut, - } -} - -impl IntoFuture { - #[inline] - pub(crate) fn new(future: Fut) -> Self { - Self { future } - } -} - -impl FusedFuture for IntoFuture { - fn is_terminated(&self) -> bool { - self.future.is_terminated() - } -} - -impl Future for IntoFuture { - type Output = Result; - - #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().future.try_poll(cx) - } -} diff --git a/futures-util/src/future/try_future/mod.rs b/futures-util/src/future/try_future/mod.rs index e5bc700714..6686329f92 100644 --- a/futures-util/src/future/try_future/mod.rs +++ b/futures-util/src/future/try_future/mod.rs @@ -5,12 +5,7 @@ #[cfg(feature = "compat")] use crate::compat::Compat; -use core::pin::Pin; -use futures_core::{ - future::TryFuture, - stream::TryStream, - task::{Context, Poll}, -}; +use futures_core::{future::TryFuture, stream::TryStream}; #[cfg(feature = "sink")] use futures_sink::Sink; @@ -23,7 +18,6 @@ use crate::future::{assert_future, Inspect, Map}; use crate::stream::assert_stream; // Combinators -mod into_future; mod try_flatten; mod try_flatten_err; @@ -89,46 +83,43 @@ delegate_all!( delegate_all!( /// Future for the [`inspect_ok`](super::TryFutureExt::inspect_ok) method. InspectOk( - Inspect, InspectOkFn> - ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Inspect::new(IntoFuture::new(x), inspect_ok_fn(f))] + Inspect> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Inspect::new(x, inspect_ok_fn(f))] ); delegate_all!( /// Future for the [`inspect_err`](super::TryFutureExt::inspect_err) method. InspectErr( - Inspect, InspectErrFn> - ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Inspect::new(IntoFuture::new(x), inspect_err_fn(f))] + Inspect> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Inspect::new(x, inspect_err_fn(f))] ); -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::into_future::IntoFuture; - delegate_all!( /// Future for the [`map_ok`](TryFutureExt::map_ok) method. MapOk( - Map, MapOkFn> - ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(IntoFuture::new(x), map_ok_fn(f))] + Map> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(x, map_ok_fn(f))] ); delegate_all!( /// Future for the [`map_err`](TryFutureExt::map_err) method. MapErr( - Map, MapErrFn> - ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(IntoFuture::new(x), map_err_fn(f))] + Map> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(x, map_err_fn(f))] ); delegate_all!( /// Future for the [`map_ok_or_else`](TryFutureExt::map_ok_or_else) method. MapOkOrElse( - Map, MapOkOrElseFn> - ): Debug + Future + FusedFuture + New[|x: Fut, f: F, g: G| Map::new(IntoFuture::new(x), map_ok_or_else_fn(f, g))] + Map> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F, g: G| Map::new(x, map_ok_or_else_fn(f, g))] ); delegate_all!( /// Future for the [`unwrap_or_else`](TryFutureExt::unwrap_or_else) method. UnwrapOrElse( - Map, UnwrapOrElseFn> - ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(IntoFuture::new(x), unwrap_or_else_fn(f))] + Map> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(x, unwrap_or_else_fn(f))] ); impl TryFutureExt for Fut {} @@ -585,41 +576,4 @@ pub trait TryFutureExt: TryFuture { { Compat::new(self) } - - /// Wraps a [`TryFuture`] into a type that implements - /// [`Future`](std::future::Future). - /// - /// [`TryFuture`]s currently do not implement the - /// [`Future`](std::future::Future) trait due to limitations of the - /// compiler. - /// - /// # Examples - /// - /// ``` - /// use futures::future::{Future, TryFuture, TryFutureExt}; - /// - /// # type T = i32; - /// # type E = (); - /// fn make_try_future() -> impl TryFuture { // ... } - /// # async { Ok::(1) } - /// # } - /// fn take_future(future: impl Future>) { /* ... */ } - /// - /// take_future(make_try_future().into_future()); - /// ``` - fn into_future(self) -> IntoFuture - where - Self: Sized, - { - assert_future::, _>(IntoFuture::new(self)) - } - - /// A convenience method for calling [`TryFuture::try_poll`] on [`Unpin`] - /// future types. - fn try_poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll> - where - Self: Unpin, - { - Pin::new(self).try_poll(cx) - } } diff --git a/futures-util/src/future/try_future/try_flatten.rs b/futures-util/src/future/try_future/try_flatten.rs index 4587ae8493..2749122b03 100644 --- a/futures-util/src/future/try_future/try_flatten.rs +++ b/futures-util/src/future/try_future/try_flatten.rs @@ -43,7 +43,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { + TryFlattenProj::First { f } => match ready!(f.poll(cx)) { Ok(f) => self.set(Self::Second { f }), Err(e) => { self.set(Self::Empty); @@ -51,7 +51,7 @@ where } }, TryFlattenProj::Second { f } => { - let output = ready!(f.try_poll(cx)); + let output = ready!(f.poll(cx)); self.set(Self::Empty); break output; } @@ -81,7 +81,7 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { + TryFlattenProj::First { f } => match ready!(f.poll(cx)) { Ok(f) => self.set(Self::Second { f }), Err(e) => { self.set(Self::Empty); @@ -112,7 +112,7 @@ where fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { + TryFlattenProj::First { f } => match ready!(f.poll(cx)) { Ok(f) => self.set(Self::Second { f }), Err(e) => { self.set(Self::Empty); diff --git a/futures-util/src/future/try_future/try_flatten_err.rs b/futures-util/src/future/try_future/try_flatten_err.rs index 6b371644d5..d56d77a76f 100644 --- a/futures-util/src/future/try_future/try_flatten_err.rs +++ b/futures-util/src/future/try_future/try_flatten_err.rs @@ -40,7 +40,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenErrProj::First { f } => match ready!(f.try_poll(cx)) { + TryFlattenErrProj::First { f } => match ready!(f.poll(cx)) { Err(f) => self.set(Self::Second { f }), Ok(e) => { self.set(Self::Empty); @@ -48,7 +48,7 @@ where } }, TryFlattenErrProj::Second { f } => { - let output = ready!(f.try_poll(cx)); + let output = ready!(f.poll(cx)); self.set(Self::Empty); break output; } diff --git a/futures-util/src/future/try_join_all.rs b/futures-util/src/future/try_join_all.rs index 506f450657..946bb1cd64 100644 --- a/futures-util/src/future/try_join_all.rs +++ b/futures-util/src/future/try_join_all.rs @@ -10,11 +10,10 @@ use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone}; +use super::{assert_future, join_all, TryFuture, TryMaybeDone}; #[cfg(not(futures_no_atomic_cas))] use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; -use crate::TryFutureExt; enum FinalState { Pending, @@ -36,11 +35,11 @@ where F: TryFuture, { Small { - elems: Pin>]>>, + elems: Pin]>>, }, #[cfg(not(futures_no_atomic_cas))] Big { - fut: TryCollect>, Vec>, + fut: TryCollect, Vec>, }, } @@ -119,7 +118,7 @@ where I: IntoIterator, I::Item: TryFuture, { - let iter = iter.into_iter().map(TryFutureExt::into_future); + let iter = iter.into_iter(); #[cfg(futures_no_atomic_cas)] { @@ -159,7 +158,7 @@ where let mut state = FinalState::AllDone; for elem in join_all::iter_pin_mut(elems.as_mut()) { - match elem.try_poll(cx) { + match elem.poll(cx) { Poll::Pending => state = FinalState::Pending, Poll::Ready(Ok(())) => {} Poll::Ready(Err(e)) => { diff --git a/futures-util/src/future/try_maybe_done.rs b/futures-util/src/future/try_maybe_done.rs index 24044d2c27..4cb19a49ef 100644 --- a/futures-util/src/future/try_maybe_done.rs +++ b/futures-util/src/future/try_maybe_done.rs @@ -76,7 +76,7 @@ impl Future for TryMaybeDone { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { unsafe { match self.as_mut().get_unchecked_mut() { - TryMaybeDone::Future(f) => match ready!(Pin::new_unchecked(f).try_poll(cx)) { + TryMaybeDone::Future(f) => match ready!(Pin::new_unchecked(f).poll(cx)) { Ok(res) => self.set(Self::Done(res)), Err(e) => { self.set(Self::Gone); diff --git a/futures-util/src/future/try_select.rs b/futures-util/src/future/try_select.rs index bc282f7db1..1c79fea928 100644 --- a/futures-util/src/future/try_select.rs +++ b/futures-util/src/future/try_select.rs @@ -1,4 +1,4 @@ -use crate::future::{Either, TryFutureExt}; +use crate::future::{Either, FutureExt}; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::task::{Context, Poll}; @@ -69,10 +69,10 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); - match a.try_poll_unpin(cx) { + match a.poll_unpin(cx) { Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Left((x, b)))), Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Left((x, b)))), - Poll::Pending => match b.try_poll_unpin(cx) { + Poll::Pending => match b.poll_unpin(cx) { Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Right((x, a)))), Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Right((x, a)))), Poll::Pending => { diff --git a/futures-util/src/stream/stream/try_fold.rs b/futures-util/src/stream/stream/try_fold.rs index fcbc0aef24..e02771234a 100644 --- a/futures-util/src/stream/stream/try_fold.rs +++ b/futures-util/src/stream/stream/try_fold.rs @@ -70,7 +70,7 @@ where Poll::Ready(loop { if let Some(fut) = this.future.as_mut().as_pin_mut() { // we're currently processing a future to produce a new accum value - let res = ready!(fut.try_poll(cx)); + let res = ready!(fut.poll(cx)); this.future.set(None); match res { Ok(a) => *this.accum = Some(a), diff --git a/futures-util/src/stream/stream/try_for_each.rs b/futures-util/src/stream/stream/try_for_each.rs index c892eb83c7..7792c27f7b 100644 --- a/futures-util/src/stream/stream/try_for_each.rs +++ b/futures-util/src/stream/stream/try_for_each.rs @@ -54,7 +54,7 @@ where let mut this = self.project(); loop { if let Some(fut) = this.future.as_mut().as_pin_mut() { - ready!(fut.try_poll(cx))?; + ready!(fut.poll(cx))?; this.future.set(None); } else { match ready!(this.stream.as_mut().poll_next(cx)) { diff --git a/futures-util/src/stream/try_stream/and_then.rs b/futures-util/src/stream/try_stream/and_then.rs index 2f8b6f2589..134a731c12 100644 --- a/futures-util/src/stream/try_stream/and_then.rs +++ b/futures-util/src/stream/try_stream/and_then.rs @@ -59,7 +59,7 @@ where Poll::Ready(loop { if let Some(fut) = this.future.as_mut().as_pin_mut() { - let item = ready!(fut.try_poll(cx)); + let item = ready!(fut.poll(cx)); this.future.set(None); break Some(item); } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) { diff --git a/futures-util/src/stream/try_stream/or_else.rs b/futures-util/src/stream/try_stream/or_else.rs index 53aceb8e64..49c3fcf36e 100644 --- a/futures-util/src/stream/try_stream/or_else.rs +++ b/futures-util/src/stream/try_stream/or_else.rs @@ -59,7 +59,7 @@ where Poll::Ready(loop { if let Some(fut) = this.future.as_mut().as_pin_mut() { - let item = ready!(fut.try_poll(cx)); + let item = ready!(fut.poll(cx)); this.future.set(None); break Some(item); } else { diff --git a/futures-util/src/stream/try_stream/try_buffer_unordered.rs b/futures-util/src/stream/try_stream/try_buffer_unordered.rs index 21b00b20ac..1d3c38b1d9 100644 --- a/futures-util/src/stream/try_stream/try_buffer_unordered.rs +++ b/futures-util/src/stream/try_stream/try_buffer_unordered.rs @@ -1,4 +1,3 @@ -use crate::future::{IntoFuture, TryFutureExt}; use crate::stream::{Fuse, FuturesUnordered, IntoStream, StreamExt}; use core::num::NonZeroUsize; use core::pin::Pin; @@ -19,7 +18,7 @@ pin_project! { { #[pin] stream: Fuse>, - in_progress_queue: FuturesUnordered>, + in_progress_queue: FuturesUnordered, max: Option, } } @@ -54,7 +53,7 @@ where // our queue of futures. Propagate errors from the stream immediately. while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) { match this.stream.as_mut().poll_next(cx)? { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()), + Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), Poll::Ready(None) | Poll::Pending => break, } } diff --git a/futures-util/src/stream/try_stream/try_buffered.rs b/futures-util/src/stream/try_stream/try_buffered.rs index 83e9093fbc..6b9dd0f7ec 100644 --- a/futures-util/src/stream/try_stream/try_buffered.rs +++ b/futures-util/src/stream/try_stream/try_buffered.rs @@ -1,4 +1,3 @@ -use crate::future::{IntoFuture, TryFutureExt}; use crate::stream::{Fuse, FuturesOrdered, IntoStream, StreamExt}; use core::num::NonZeroUsize; use core::pin::Pin; @@ -20,7 +19,7 @@ pin_project! { { #[pin] stream: Fuse>, - in_progress_queue: FuturesOrdered>, + in_progress_queue: FuturesOrdered, max: Option, } } @@ -55,7 +54,7 @@ where // our queue of futures. Propagate errors from the stream immediately. while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) { match this.stream.as_mut().poll_next(cx)? { - Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()), + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut), Poll::Ready(None) | Poll::Pending => break, } } diff --git a/futures-util/src/stream/try_stream/try_filter_map.rs b/futures-util/src/stream/try_stream/try_filter_map.rs index ed1201732b..1220e0c4dd 100644 --- a/futures-util/src/stream/try_stream/try_filter_map.rs +++ b/futures-util/src/stream/try_stream/try_filter_map.rs @@ -67,7 +67,7 @@ where Poll::Ready(loop { if let Some(p) = this.pending.as_mut().as_pin_mut() { // We have an item in progress, poll that until it's done - let res = ready!(p.try_poll(cx)); + let res = ready!(p.poll(cx)); this.pending.set(None); let item = res?; if item.is_some() { diff --git a/futures-util/src/stream/try_stream/try_skip_while.rs b/futures-util/src/stream/try_stream/try_skip_while.rs index 52aa2d478b..fb56c27801 100644 --- a/futures-util/src/stream/try_stream/try_skip_while.rs +++ b/futures-util/src/stream/try_stream/try_skip_while.rs @@ -69,7 +69,7 @@ where Poll::Ready(loop { if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { - let res = ready!(fut.try_poll(cx)); + let res = ready!(fut.poll(cx)); this.pending_fut.set(None); let skipped = res?; let item = this.pending_item.take(); diff --git a/futures-util/src/stream/try_stream/try_take_while.rs b/futures-util/src/stream/try_stream/try_take_while.rs index 4b5ff1ad38..3b50ee3e43 100644 --- a/futures-util/src/stream/try_stream/try_take_while.rs +++ b/futures-util/src/stream/try_stream/try_take_while.rs @@ -72,7 +72,7 @@ where Poll::Ready(loop { if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { - let res = ready!(fut.try_poll(cx)); + let res = ready!(fut.poll(cx)); this.pending_fut.set(None); let take = res?; let item = this.pending_item.take(); diff --git a/futures-util/src/stream/try_stream/try_unfold.rs b/futures-util/src/stream/try_stream/try_unfold.rs index fd9cdf1d8c..15256b0020 100644 --- a/futures-util/src/stream/try_stream/try_unfold.rs +++ b/futures-util/src/stream/try_stream/try_unfold.rs @@ -105,7 +105,7 @@ where Poll::Ready(None) } Some(future) => { - let step = ready!(future.try_poll(cx)); + let step = ready!(future.poll(cx)); this.fut.set(None); match step { diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index f1185d1b7b..5d9029ae00 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -333,13 +333,6 @@ pub mod future { assert_impl!(InspectOk: Unpin); assert_not_impl!(InspectOk: Unpin); - assert_impl!(IntoFuture: Send); - assert_not_impl!(IntoFuture: Send); - assert_impl!(IntoFuture: Sync); - assert_not_impl!(IntoFuture: Sync); - assert_impl!(IntoFuture: Unpin); - assert_not_impl!(IntoFuture: Unpin); - assert_impl!(IntoStream: Send); assert_not_impl!(IntoStream: Send); assert_impl!(IntoStream: Sync);