From 8123a22e54aed1c2e85e58ea534036413e8b255c Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 17 Jan 2021 17:38:15 +0900 Subject: [PATCH] Use assert_* functions for all future/stream/sink/io combinators --- futures-util/src/future/abortable.rs | 5 +- futures-util/src/future/future/mod.rs | 7 ++- futures-util/src/future/join.rs | 25 +++++--- futures-util/src/future/join_all.rs | 4 +- futures-util/src/future/lazy.rs | 3 +- futures-util/src/future/maybe_done.rs | 3 +- futures-util/src/future/pending.rs | 5 +- futures-util/src/future/poll_fn.rs | 3 +- futures-util/src/future/ready.rs | 3 +- futures-util/src/future/select.rs | 3 +- futures-util/src/future/select_all.rs | 3 +- futures-util/src/future/select_ok.rs | 3 +- futures-util/src/future/try_future/mod.rs | 4 +- futures-util/src/future/try_join.rs | 16 +++-- futures-util/src/future/try_join_all.rs | 6 +- futures-util/src/future/try_maybe_done.rs | 3 +- futures-util/src/future/try_select.rs | 5 +- futures-util/src/io/mod.rs | 60 ++++++++++++------- futures-util/src/sink/drain.rs | 3 +- futures-util/src/sink/mod.rs | 41 ++++++++----- futures-util/src/sink/unfold.rs | 5 +- futures-util/src/stream/empty.rs | 5 +- futures-util/src/stream/iter.rs | 5 +- futures-util/src/stream/mod.rs | 6 +- futures-util/src/stream/once.rs | 3 +- futures-util/src/stream/pending.rs | 3 +- futures-util/src/stream/poll_fn.rs | 3 +- futures-util/src/stream/repeat.rs | 3 +- futures-util/src/stream/repeat_with.rs | 3 +- futures-util/src/stream/select.rs | 5 +- futures-util/src/stream/select_all.rs | 3 +- futures-util/src/stream/stream/mod.rs | 30 ++++++---- futures-util/src/stream/try_stream/mod.rs | 10 ++-- .../src/stream/try_stream/try_unfold.rs | 5 +- futures-util/src/stream/unfold.rs | 5 +- 35 files changed, 192 insertions(+), 107 deletions(-) diff --git a/futures-util/src/future/abortable.rs b/futures-util/src/future/abortable.rs index 1fc75b08fc..3f2e5a064d 100644 --- a/futures-util/src/future/abortable.rs +++ b/futures-util/src/future/abortable.rs @@ -1,3 +1,4 @@ +use super::assert_future; use crate::task::AtomicWaker; use futures_core::future::Future; use futures_core::task::{Context, Poll}; @@ -39,10 +40,10 @@ impl Abortable where Fut: Future { /// # }); /// ``` pub fn new(future: Fut, reg: AbortRegistration) -> Self { - Self { + assert_future::, _>(Self { future, inner: reg.inner, - } + }) } } diff --git a/futures-util/src/future/future/mod.rs b/futures-util/src/future/future/mod.rs index f01d3462f7..c11d108207 100644 --- a/futures-util/src/future/future/mod.rs +++ b/futures-util/src/future/future/mod.rs @@ -7,10 +7,10 @@ use alloc::boxed::Box; use core::pin::Pin; -use crate::future::{assert_future, Either}; -use crate::stream::assert_stream; use crate::fns::{inspect_fn, into_fn, ok_fn, InspectFn, IntoFn, OkFn}; +use crate::future::{assert_future, Either}; use crate::never::Never; +use crate::stream::assert_stream; #[cfg(feature = "alloc")] use futures_core::future::{BoxFuture, LocalBoxFuture}; use futures_core::{ @@ -506,7 +506,8 @@ pub trait FutureExt: Future { where Self: Sized, { - remote_handle::remote_handle(self) + let (wrapped, handle) = remote_handle::remote_handle(self); + (assert_future::<(), _>(wrapped), handle) } /// Wrap the future in a Box, pinning it. diff --git a/futures-util/src/future/join.rs b/futures-util/src/future/join.rs index cfe53a72c8..a8183433df 100644 --- a/futures-util/src/future/join.rs +++ b/futures-util/src/future/join.rs @@ -1,14 +1,13 @@ #![allow(non_snake_case)] -use crate::future::{MaybeDone, maybe_done}; +use super::assert_future; +use crate::future::{maybe_done, MaybeDone}; use core::fmt; use core::pin::Pin; -use futures_core::future::{Future, FusedFuture}; +use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; -use super::assert_future; - macro_rules! generate { ($( $(#[$doc:meta])* @@ -144,7 +143,8 @@ where Fut2: Future, Fut3: Future, { - Join3::new(future1, future2, future3) + let f = Join3::new(future1, future2, future3); + assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output), _>(f) } /// Same as [`join`](join()), but with more futures. @@ -176,7 +176,8 @@ where Fut3: Future, Fut4: Future, { - Join4::new(future1, future2, future3, future4) + let f = Join4::new(future1, future2, future3, future4); + assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output, Fut4::Output), _>(f) } /// Same as [`join`](join()), but with more futures. @@ -211,5 +212,15 @@ where Fut4: Future, Fut5: Future, { - Join5::new(future1, future2, future3, future4, future5) + let f = Join5::new(future1, future2, future3, future4, future5); + assert_future::< + ( + Fut1::Output, + Fut2::Output, + Fut3::Output, + Fut4::Output, + Fut5::Output, + ), + _, + >(f) } diff --git a/futures-util/src/future/join_all.rs b/futures-util/src/future/join_all.rs index 0c8357ca68..7ccf869042 100644 --- a/futures-util/src/future/join_all.rs +++ b/futures-util/src/future/join_all.rs @@ -10,7 +10,7 @@ use core::task::{Context, Poll}; use alloc::boxed::Box; use alloc::vec::Vec; -use super::MaybeDone; +use super::{MaybeDone, assert_future}; fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { // Safety: `std` _could_ make this unsound if it were to decide Pin's @@ -85,7 +85,7 @@ where I::Item: Future, { let elems: Box<[_]> = i.into_iter().map(MaybeDone::Future).collect(); - JoinAll { elems: elems.into() } + assert_future::::Output>, _>(JoinAll { elems: elems.into() }) } impl Future for JoinAll diff --git a/futures-util/src/future/lazy.rs b/futures-util/src/future/lazy.rs index 409717a2ab..42812d3893 100644 --- a/futures-util/src/future/lazy.rs +++ b/futures-util/src/future/lazy.rs @@ -1,3 +1,4 @@ +use super::assert_future; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; @@ -34,7 +35,7 @@ impl Unpin for Lazy {} pub fn lazy(f: F) -> Lazy where F: FnOnce(&mut Context<'_>) -> R, { - Lazy { f: Some(f) } + assert_future::(Lazy { f: Some(f) }) } impl FusedFuture for Lazy diff --git a/futures-util/src/future/maybe_done.rs b/futures-util/src/future/maybe_done.rs index bb5579e859..26e6c27588 100644 --- a/futures-util/src/future/maybe_done.rs +++ b/futures-util/src/future/maybe_done.rs @@ -1,5 +1,6 @@ //! Definition of the MaybeDone combinator +use super::assert_future; use core::mem; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; @@ -40,7 +41,7 @@ impl Unpin for MaybeDone {} /// # }); /// ``` pub fn maybe_done(future: Fut) -> MaybeDone { - MaybeDone::Future(future) + assert_future::<(), _>(MaybeDone::Future(future)) } impl MaybeDone { diff --git a/futures-util/src/future/pending.rs b/futures-util/src/future/pending.rs index 5a7bbb8d59..4311b9a228 100644 --- a/futures-util/src/future/pending.rs +++ b/futures-util/src/future/pending.rs @@ -1,3 +1,4 @@ +use super::assert_future; use core::marker; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; @@ -33,9 +34,9 @@ impl FusedFuture for Pending { /// # }); /// ``` pub fn pending() -> Pending { - Pending { + assert_future::(Pending { _data: marker::PhantomData, - } + }) } impl Future for Pending { diff --git a/futures-util/src/future/poll_fn.rs b/futures-util/src/future/poll_fn.rs index b7b10be85d..6ac1ab818e 100644 --- a/futures-util/src/future/poll_fn.rs +++ b/futures-util/src/future/poll_fn.rs @@ -1,5 +1,6 @@ //! Definition of the `PollFn` adapter combinator +use super::assert_future; use core::fmt; use core::pin::Pin; use futures_core::future::Future; @@ -36,7 +37,7 @@ pub fn poll_fn(f: F) -> PollFn where F: FnMut(&mut Context<'_>) -> Poll { - PollFn { f } + assert_future::(PollFn { f }) } impl fmt::Debug for PollFn { diff --git a/futures-util/src/future/ready.rs b/futures-util/src/future/ready.rs index 35f01c9b16..e3d791b3cf 100644 --- a/futures-util/src/future/ready.rs +++ b/futures-util/src/future/ready.rs @@ -1,3 +1,4 @@ +use super::assert_future; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; @@ -45,7 +46,7 @@ impl Future for Ready { /// # }); /// ``` pub fn ready(t: T) -> Ready { - Ready(Some(t)) + assert_future::(Ready(Some(t))) } /// Create a future that is immediately ready with a success value. diff --git a/futures-util/src/future/select.rs b/futures-util/src/future/select.rs index bc247796dd..fc4316afa2 100644 --- a/futures-util/src/future/select.rs +++ b/futures-util/src/future/select.rs @@ -1,3 +1,4 @@ +use super::assert_future; use core::pin::Pin; use futures_core::future::{Future, FusedFuture}; use futures_core::task::{Context, Poll}; @@ -75,7 +76,7 @@ impl Unpin for Select {} pub fn select(future1: A, future2: B) -> Select where A: Future + Unpin, B: Future + Unpin { - Select { inner: Some((future1, future2)) } + assert_future::, _>(Select { inner: Some((future1, future2)) }) } impl Future for Select diff --git a/futures-util/src/future/select_all.rs b/futures-util/src/future/select_all.rs index 9f7fb245bf..0db90a750e 100644 --- a/futures-util/src/future/select_all.rs +++ b/futures-util/src/future/select_all.rs @@ -1,3 +1,4 @@ +use super::assert_future; use crate::future::FutureExt; use core::iter::FromIterator; use core::mem; @@ -38,7 +39,7 @@ pub fn select_all(iter: I) -> SelectAll inner: iter.into_iter().collect() }; assert!(!ret.inner.is_empty()); - ret + assert_future::<(::Output, usize, Vec), _>(ret) } impl Future for SelectAll { diff --git a/futures-util/src/future/select_ok.rs b/futures-util/src/future/select_ok.rs index 7f4f4d65f4..52d393c28e 100644 --- a/futures-util/src/future/select_ok.rs +++ b/futures-util/src/future/select_ok.rs @@ -1,3 +1,4 @@ +use super::assert_future; use crate::future::TryFutureExt; use core::iter::FromIterator; use core::mem; @@ -36,7 +37,7 @@ pub fn select_ok(iter: I) -> SelectOk inner: iter.into_iter().collect() }; assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); - ret + assert_future::::Ok, Vec), ::Error>, _>(ret) } impl Future for SelectOk { diff --git a/futures-util/src/future/try_future/mod.rs b/futures-util/src/future/try_future/mod.rs index 1ce01d2de0..40158074a1 100644 --- a/futures-util/src/future/try_future/mod.rs +++ b/futures-util/src/future/try_future/mod.rs @@ -173,7 +173,7 @@ pub trait TryFutureExt: TryFuture { Self::Ok: Sink, Self: Sized, { - FlattenSink::new(self) + crate::sink::assert_sink::(FlattenSink::new(self)) } /// Maps this future's success value to a different value. @@ -501,7 +501,7 @@ pub trait TryFutureExt: TryFuture { Self::Ok: TryFuture, Self: Sized, { - TryFlatten::new(self) + assert_future::::Ok, Self::Error>, _>(TryFlatten::new(self)) } /// Flatten the execution of this future when the successful result of this diff --git a/futures-util/src/future/try_join.rs b/futures-util/src/future/try_join.rs index 25ccdde39c..de32ce3ef4 100644 --- a/futures-util/src/future/try_join.rs +++ b/futures-util/src/future/try_join.rs @@ -1,6 +1,6 @@ #![allow(non_snake_case)] -use crate::future::{TryMaybeDone, try_maybe_done}; +use crate::future::{assert_future, try_maybe_done, TryMaybeDone}; use core::fmt; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; @@ -150,7 +150,7 @@ where Fut1: TryFuture, Fut2: TryFuture, { - TryJoin::new(future1, future2) + assert_future::, _>(TryJoin::new(future1, future2)) } /// Same as [`try_join`](try_join()), but with more futures. @@ -179,7 +179,9 @@ where Fut2: TryFuture, Fut3: TryFuture, { - TryJoin3::new(future1, future2, future3) + assert_future::, _>(TryJoin3::new( + future1, future2, future3, + )) } /// Same as [`try_join`](try_join()), but with more futures. @@ -211,7 +213,9 @@ where Fut3: TryFuture, Fut4: TryFuture, { - TryJoin4::new(future1, future2, future3, future4) + assert_future::, _>( + TryJoin4::new(future1, future2, future3, future4), + ) } /// Same as [`try_join`](try_join()), but with more futures. @@ -246,5 +250,7 @@ where Fut4: TryFuture, Fut5: TryFuture, { - TryJoin5::new(future1, future2, future3, future4, future5) + assert_future::, _>( + TryJoin5::new(future1, future2, future3, future4, future5), + ) } diff --git a/futures-util/src/future/try_join_all.rs b/futures-util/src/future/try_join_all.rs index 4de0a79b94..371f753f3f 100644 --- a/futures-util/src/future/try_join_all.rs +++ b/futures-util/src/future/try_join_all.rs @@ -10,7 +10,7 @@ use core::task::{Context, Poll}; use alloc::boxed::Box; use alloc::vec::Vec; -use super::{TryFuture, TryMaybeDone}; +use super::{assert_future, TryFuture, TryMaybeDone}; fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { // Safety: `std` _could_ make this unsound if it were to decide Pin's @@ -93,9 +93,9 @@ where I::Item: TryFuture, { let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect(); - TryJoinAll { + assert_future::::Ok>, ::Error>, _>(TryJoinAll { elems: elems.into(), - } + }) } impl Future for TryJoinAll diff --git a/futures-util/src/future/try_maybe_done.rs b/futures-util/src/future/try_maybe_done.rs index 90067e90be..dfd290065d 100644 --- a/futures-util/src/future/try_maybe_done.rs +++ b/futures-util/src/future/try_maybe_done.rs @@ -1,5 +1,6 @@ //! Definition of the TryMaybeDone combinator +use super::assert_future; use core::mem; use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; @@ -25,7 +26,7 @@ impl Unpin for TryMaybeDone {} /// Wraps a future into a `TryMaybeDone` pub fn try_maybe_done(future: Fut) -> TryMaybeDone { - TryMaybeDone::Future(future) + assert_future::, _>(TryMaybeDone::Future(future)) } impl TryMaybeDone { diff --git a/futures-util/src/future/try_select.rs b/futures-util/src/future/try_select.rs index 56564f5b5c..b26eed35f0 100644 --- a/futures-util/src/future/try_select.rs +++ b/futures-util/src/future/try_select.rs @@ -50,7 +50,10 @@ impl Unpin for TrySelect {} pub fn try_select(future1: A, future2: B) -> TrySelect where A: TryFuture + Unpin, B: TryFuture + Unpin { - TrySelect { inner: Some((future1, future2)) } + super::assert_future::, + Either<(A::Error, B), (B::Error, A)>, + >, _>(TrySelect { inner: Some((future1, future2)) }) } impl Future for TrySelect diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index a7e2add34d..c2822640ab 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -19,6 +19,8 @@ #[cfg(feature = "io-compat")] #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] use crate::compat::Compat; +use crate::future::assert_future; +use crate::stream::assert_stream; use std::{ptr, pin::Pin}; pub use futures_io::{ @@ -173,7 +175,7 @@ pub trait AsyncReadExt: AsyncRead { Self: Sized, R: AsyncRead, { - Chain::new(self, next) + assert_read(Chain::new(self, next)) } /// Tries to read some bytes directly into the given `buf` in asynchronous @@ -203,7 +205,7 @@ pub trait AsyncReadExt: AsyncRead { fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> where Self: Unpin, { - Read::new(self, buf) + assert_future::, _>(Read::new(self, buf)) } /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored @@ -214,7 +216,7 @@ pub trait AsyncReadExt: AsyncRead { fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self> where Self: Unpin, { - ReadVectored::new(self, bufs) + assert_future::, _>(ReadVectored::new(self, bufs)) } /// Creates a future which will read exactly enough bytes to fill `buf`, @@ -260,7 +262,7 @@ pub trait AsyncReadExt: AsyncRead { ) -> ReadExact<'a, Self> where Self: Unpin, { - ReadExact::new(self, buf) + assert_future::, _>(ReadExact::new(self, buf)) } /// Creates a future which will read all the bytes from this `AsyncRead`. @@ -288,7 +290,7 @@ pub trait AsyncReadExt: AsyncRead { ) -> ReadToEnd<'a, Self> where Self: Unpin, { - ReadToEnd::new(self, buf) + assert_future::, _>(ReadToEnd::new(self, buf)) } /// Creates a future which will read all the bytes from this `AsyncRead`. @@ -316,7 +318,7 @@ pub trait AsyncReadExt: AsyncRead { ) -> ReadToString<'a, Self> where Self: Unpin, { - ReadToString::new(self, buf) + assert_future::, _>(ReadToString::new(self, buf)) } /// Helper method for splitting this read/write object into two halves. @@ -351,7 +353,8 @@ pub trait AsyncReadExt: AsyncRead { fn split(self) -> (ReadHalf, WriteHalf) where Self: AsyncWrite + Sized, { - split::split(self) + let (r, w) = split::split(self); + (assert_read(r), assert_write(w)) } /// Creates an AsyncRead adapter which will read at most `limit` bytes @@ -376,7 +379,7 @@ pub trait AsyncReadExt: AsyncRead { fn take(self, limit: u64) -> Take where Self: Sized { - Take::new(self, limit) + assert_read(Take::new(self, limit)) } /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be @@ -423,14 +426,14 @@ pub trait AsyncWriteExt: AsyncWrite { fn flush(&mut self) -> Flush<'_, Self> where Self: Unpin, { - Flush::new(self) + assert_future::, _>(Flush::new(self)) } /// Creates a future which will entirely close this `AsyncWrite`. fn close(&mut self) -> Close<'_, Self> where Self: Unpin, { - Close::new(self) + assert_future::, _>(Close::new(self)) } /// Creates a future which will write bytes from `buf` into the object. @@ -440,7 +443,7 @@ pub trait AsyncWriteExt: AsyncWrite { fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self> where Self: Unpin, { - Write::new(self, buf) + assert_future::, _>(Write::new(self, buf)) } /// Creates a future which will write bytes from `bufs` into the object using vectored @@ -451,7 +454,7 @@ pub trait AsyncWriteExt: AsyncWrite { fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self> where Self: Unpin, { - WriteVectored::new(self, bufs) + assert_future::, _>(WriteVectored::new(self, bufs)) } /// Write data into this object. @@ -477,7 +480,7 @@ pub trait AsyncWriteExt: AsyncWrite { fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self> where Self: Unpin, { - WriteAll::new(self, buf) + assert_future::, _>(WriteAll::new(self, buf)) } /// Attempts to write multiple buffers into this writer. @@ -532,7 +535,7 @@ pub trait AsyncWriteExt: AsyncWrite { where Self: Unpin, { - WriteAllVectored::new(self, bufs) + assert_future::, _>(WriteAllVectored::new(self, bufs)) } /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be @@ -577,7 +580,7 @@ pub trait AsyncWriteExt: AsyncWrite { fn into_sink>(self) -> IntoSink where Self: Sized, { - IntoSink::new(self) + crate::sink::assert_sink::(IntoSink::new(self)) } } @@ -593,7 +596,7 @@ pub trait AsyncSeekExt: AsyncSeek { fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> where Self: Unpin, { - Seek::new(self, pos) + assert_future::, _>(Seek::new(self, pos)) } } @@ -627,7 +630,7 @@ pub trait AsyncBufReadExt: AsyncBufRead { fn fill_buf(&mut self) -> FillBuf<'_, Self> where Self: Unpin, { - FillBuf::new(self) + assert_future::, _>(FillBuf::new(self)) } /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types. @@ -701,7 +704,7 @@ pub trait AsyncBufReadExt: AsyncBufRead { ) -> ReadUntil<'a, Self> where Self: Unpin, { - ReadUntil::new(self, byte, buf) + assert_future::, _>(ReadUntil::new(self, byte, buf)) } /// Creates a future which will read all the bytes associated with this I/O @@ -758,7 +761,7 @@ pub trait AsyncBufReadExt: AsyncBufRead { fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> where Self: Unpin, { - ReadLine::new(self, buf) + assert_future::, _>(ReadLine::new(self, buf)) } /// Returns a stream over the lines of this reader. @@ -796,8 +799,25 @@ pub trait AsyncBufReadExt: AsyncBufRead { fn lines(self) -> Lines where Self: Sized, { - Lines::new(self) + assert_stream::, _>(Lines::new(self)) } } impl AsyncBufReadExt for R {} + +// Just a helper function to ensure the reader we're returning all have the +// right implementations. +pub(crate) fn assert_read(reader: R) -> R +where + R: AsyncRead, +{ + reader +} +// Just a helper function to ensure the writer we're returning all have the +// right implementations. +pub(crate) fn assert_write(writer: W) -> W +where + W: AsyncWrite, +{ + writer +} diff --git a/futures-util/src/sink/drain.rs b/futures-util/src/sink/drain.rs index 46de83b7c2..33c5b3167c 100644 --- a/futures-util/src/sink/drain.rs +++ b/futures-util/src/sink/drain.rs @@ -1,3 +1,4 @@ +use super::assert_sink; use crate::never::Never; use core::marker::PhantomData; use core::pin::Pin; @@ -26,7 +27,7 @@ pub struct Drain { /// # Ok::<(), futures::never::Never>(()) }).unwrap(); /// ``` pub fn drain() -> Drain { - Drain { marker: PhantomData } + assert_sink::(Drain { marker: PhantomData }) } impl Unpin for Drain {} diff --git a/futures-util/src/sink/mod.rs b/futures-util/src/sink/mod.rs index 1a062d005f..e5b515b64a 100644 --- a/futures-util/src/sink/mod.rs +++ b/futures-util/src/sink/mod.rs @@ -6,7 +6,7 @@ //! - The [`SinkExt`] trait, which provides adapters for chaining and composing //! sinks. -use crate::future::Either; +use crate::future::{assert_future, Either}; use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{Stream, TryStream}; @@ -81,7 +81,7 @@ pub trait SinkExt: Sink { E: From, Self: Sized, { - With::new(self, f) + assert_sink::(With::new(self, f)) } /// Composes a function *in front of* the sink. @@ -122,7 +122,7 @@ pub trait SinkExt: Sink { St: Stream>, Self: Sized, { - WithFlatMap::new(self, f) + assert_sink::(WithFlatMap::new(self, f)) } /* @@ -145,7 +145,7 @@ pub trait SinkExt: Sink { F: FnOnce(Self::Error) -> E, Self: Sized, { - SinkMapErr::new(self, f) + assert_sink::(SinkMapErr::new(self, f)) } /// Map this sink's error to a different error type using the `Into` trait. @@ -156,7 +156,7 @@ pub trait SinkExt: Sink { Self: Sized, Self::Error: Into, { - SinkErrInto::new(self) + assert_sink::(SinkErrInto::new(self)) } /// Adds a fixed-size buffer to the current sink. @@ -176,7 +176,7 @@ pub trait SinkExt: Sink { where Self: Sized, { - Buffer::new(self, capacity) + assert_sink::(Buffer::new(self, capacity)) } /// Close the sink. @@ -184,7 +184,7 @@ pub trait SinkExt: Sink { where Self: Unpin, { - Close::new(self) + assert_future::, _>(Close::new(self)) } /// Fanout items to multiple sinks. @@ -197,7 +197,7 @@ pub trait SinkExt: Sink { Item: Clone, Si: Sink, { - Fanout::new(self, other) + assert_sink::(Fanout::new(self, other)) } /// Flush the sink, processing all pending items. @@ -208,7 +208,7 @@ pub trait SinkExt: Sink { where Self: Unpin, { - Flush::new(self) + assert_future::, _>(Flush::new(self)) } /// A future that completes after the given item has been fully processed @@ -221,7 +221,7 @@ pub trait SinkExt: Sink { where Self: Unpin, { - Send::new(self, item) + assert_future::, _>(Send::new(self, item)) } /// A future that completes after the given item has been received @@ -231,9 +231,10 @@ pub trait SinkExt: Sink { /// It is the caller's responsibility to ensure all pending items /// are processed, which can be done via `flush` or `close`. fn feed(&mut self, item: Item) -> Feed<'_, Self, Item> - where Self: Unpin, + where + Self: Unpin, { - Feed::new(self, item) + assert_future::, _>(Feed::new(self, item)) } /// A future that completes after the given stream has been fully processed @@ -250,8 +251,11 @@ pub trait SinkExt: Sink { fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St> where St: TryStream + Stream + Unpin + ?Sized, + // St: Stream> + Unpin + ?Sized, Self: Unpin, { + // TODO: type mismatch resolving `::Item == std::result::Result>::Error>` + // assert_future::, _>(SendAll::new(self, stream)) SendAll::new(self, stream) } @@ -265,7 +269,7 @@ pub trait SinkExt: Sink { Si2: Sink, Self: Sized, { - Either::Left(self) + assert_sink::(Either::Left(self)) } /// Wrap this stream in an `Either` stream, making it the right-hand variant @@ -278,7 +282,7 @@ pub trait SinkExt: Sink { Si1: Sink, Self: Sized, { - Either::Right(self) + assert_sink::(Either::Right(self)) } /// Wraps a [`Sink`] into a sink compatible with libraries using @@ -328,3 +332,12 @@ pub trait SinkExt: Sink { Pin::new(self).poll_close(cx) } } + +// Just a helper function to ensure the sinks we're returning all have the +// right implementations. +pub(crate) fn assert_sink(sink: S) -> S +where + S: Sink, +{ + sink +} diff --git a/futures-util/src/sink/unfold.rs b/futures-util/src/sink/unfold.rs index 1aab200dfb..3903716837 100644 --- a/futures-util/src/sink/unfold.rs +++ b/futures-util/src/sink/unfold.rs @@ -1,3 +1,4 @@ +use super::assert_sink; use crate::unfold_state::UnfoldState; use core::{future::Future, pin::Pin}; use futures_core::ready; @@ -40,10 +41,10 @@ where F: FnMut(T, Item) -> R, R: Future>, { - Unfold { + assert_sink::(Unfold { function, state: UnfoldState::Value { value: init }, - } + }) } impl Sink for Unfold diff --git a/futures-util/src/stream/empty.rs b/futures-util/src/stream/empty.rs index d228b31581..c629a4b7fc 100644 --- a/futures-util/src/stream/empty.rs +++ b/futures-util/src/stream/empty.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::marker::PhantomData; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; @@ -14,9 +15,9 @@ pub struct Empty { /// /// The returned stream will always return `Ready(None)` when polled. pub fn empty() -> Empty { - Empty { + assert_stream::(Empty { _phantom: PhantomData - } + }) } impl Unpin for Empty {} diff --git a/futures-util/src/stream/iter.rs b/futures-util/src/stream/iter.rs index cab8cd81b1..033dae1924 100644 --- a/futures-util/src/stream/iter.rs +++ b/futures-util/src/stream/iter.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::pin::Pin; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; @@ -28,9 +29,9 @@ impl Unpin for Iter {} pub fn iter(i: I) -> Iter where I: IntoIterator, { - Iter { + assert_stream::(Iter { iter: i.into_iter(), - } + }) } impl Stream for Iter diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index a5624badac..f3b2baa408 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -109,11 +109,11 @@ cfg_target_has_atomic! { pub use self::select_all::{select_all, SelectAll}; } -// Just a helper function to ensure the futures we're returning all have the +// Just a helper function to ensure the streams we're returning all have the // right implementations. pub(crate) fn assert_stream(stream: S) -> S - where - S: Stream, +where + S: Stream, { stream } diff --git a/futures-util/src/stream/once.rs b/futures-util/src/stream/once.rs index 318de076d1..e16fe00148 100644 --- a/futures-util/src/stream/once.rs +++ b/futures-util/src/stream/once.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::pin::Pin; use futures_core::future::Future; use futures_core::ready; @@ -17,7 +18,7 @@ use pin_project_lite::pin_project; /// # }); /// ``` pub fn once(future: Fut) -> Once { - Once::new(future) + assert_stream::(Once::new(future)) } pin_project! { diff --git a/futures-util/src/stream/pending.rs b/futures-util/src/stream/pending.rs index ca793c1e04..d7030ff3cc 100644 --- a/futures-util/src/stream/pending.rs +++ b/futures-util/src/stream/pending.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::marker; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; @@ -14,7 +15,7 @@ pub struct Pending { /// /// The returned stream will always return `Pending` when polled. pub fn pending() -> Pending { - Pending { _data: marker::PhantomData } + assert_stream::(Pending { _data: marker::PhantomData }) } impl Unpin for Pending {} diff --git a/futures-util/src/stream/poll_fn.rs b/futures-util/src/stream/poll_fn.rs index e33ca577ae..b9bd7d1664 100644 --- a/futures-util/src/stream/poll_fn.rs +++ b/futures-util/src/stream/poll_fn.rs @@ -1,5 +1,6 @@ //! Definition of the `PollFn` combinator +use super::assert_stream; use core::fmt; use core::pin::Pin; use futures_core::stream::Stream; @@ -41,7 +42,7 @@ pub fn poll_fn(f: F) -> PollFn where F: FnMut(&mut Context<'_>) -> Poll>, { - PollFn { f } + assert_stream::(PollFn { f }) } impl Stream for PollFn diff --git a/futures-util/src/stream/repeat.rs b/futures-util/src/stream/repeat.rs index 6a2637d3f5..cf9f21bcf2 100644 --- a/futures-util/src/stream/repeat.rs +++ b/futures-util/src/stream/repeat.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::pin::Pin; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; @@ -26,7 +27,7 @@ pub struct Repeat { pub fn repeat(item: T) -> Repeat where T: Clone { - Repeat { item } + assert_stream::(Repeat { item }) } impl Unpin for Repeat {} diff --git a/futures-util/src/stream/repeat_with.rs b/futures-util/src/stream/repeat_with.rs index eb3313d8c4..0255643d57 100644 --- a/futures-util/src/stream/repeat_with.rs +++ b/futures-util/src/stream/repeat_with.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::pin::Pin; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; @@ -89,5 +90,5 @@ impl A> FusedStream for RepeatWith /// # }); /// ``` pub fn repeat_with A>(repeater: F) -> RepeatWith { - RepeatWith { repeater } + assert_stream::(RepeatWith { repeater }) } diff --git a/futures-util/src/stream/select.rs b/futures-util/src/stream/select.rs index 2b7ebec5d6..2942494678 100644 --- a/futures-util/src/stream/select.rs +++ b/futures-util/src/stream/select.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use crate::stream::{StreamExt, Fuse}; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; @@ -31,11 +32,11 @@ pub fn select(stream1: St1, stream2: St2) -> Select where St1: Stream, St2: Stream { - Select { + assert_stream::(Select { stream1: stream1.fuse(), stream2: stream2.fuse(), flag: false, - } + }) } impl Select { diff --git a/futures-util/src/stream/select_all.rs b/futures-util/src/stream/select_all.rs index 00368bb969..c0b92faabe 100644 --- a/futures-util/src/stream/select_all.rs +++ b/futures-util/src/stream/select_all.rs @@ -8,6 +8,7 @@ use futures_core::ready; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; +use super::assert_stream; use crate::stream::{StreamExt, StreamFuture, FuturesUnordered}; /// An unbounded set of streams @@ -124,7 +125,7 @@ pub fn select_all(streams: I) -> SelectAll set.push(stream); } - set + assert_stream::<::Item, _>(set) } impl FromIterator for SelectAll { diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index b1b4384279..081b71d175 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -4,8 +4,11 @@ //! including the `StreamExt` trait which adds methods to `Stream` types. use crate::future::{assert_future, Either}; +use crate::stream::assert_stream; #[cfg(feature = "alloc")] use alloc::boxed::Box; +#[cfg(feature = "alloc")] +use alloc::vec::Vec; use core::pin::Pin; #[cfg(feature = "sink")] use futures_core::stream::TryStream; @@ -19,7 +22,7 @@ use futures_core::{ #[cfg(feature = "sink")] use futures_sink::Sink; -use crate::fns::{InspectFn, inspect_fn}; +use crate::fns::{inspect_fn, InspectFn}; mod chain; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 @@ -201,7 +204,6 @@ mod catch_unwind; #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::catch_unwind::CatchUnwind; -use crate::stream::assert_stream; impl StreamExt for T where T: Stream {} @@ -689,7 +691,7 @@ pub trait StreamExt: Stream { U: Stream, Self: Sized, { - FlatMap::new(self, f) + assert_stream::(FlatMap::new(self, f)) } /// Combinator similar to [`StreamExt::fold`] that holds internal state @@ -722,7 +724,7 @@ pub trait StreamExt: Stream { Fut: Future>, Self: Sized, { - Scan::new(self, initial_state, f) + assert_stream::(Scan::new(self, initial_state, f)) } /// Skip elements on this stream while the provided asynchronous predicate @@ -827,7 +829,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - TakeUntil::new(self, fut) + assert_stream::(TakeUntil::new(self, fut)) } /// Runs this stream to completion, executing the provided asynchronous @@ -1289,7 +1291,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - assert_stream::, _>(Chunks::new(self, capacity)) + assert_stream::, _>(Chunks::new(self, capacity)) } /// An adaptor for chunking up ready items of the stream inside a vector. @@ -1312,10 +1314,10 @@ pub trait StreamExt: Stream { /// This method will panic if `capacity` is zero. #[cfg(feature = "alloc")] fn ready_chunks(self, capacity: usize) -> ReadyChunks - where - Self: Sized, + where + Self: Sized, { - ReadyChunks::new(self, capacity) + assert_stream::, _>(ReadyChunks::new(self, capacity)) } /// A future that completes after the given stream has been fully processed @@ -1334,7 +1336,10 @@ pub trait StreamExt: Stream { where S: Sink, Self: TryStream + Sized, + // Self: TryStream + Sized + Stream::Ok, ::Error>>, { + // TODO: type mismatch resolving `::Item == std::result::Result<::Ok, ::Error>` + // assert_future::, _>(Forward::new(self, sink)) Forward::new(self, sink) } @@ -1356,7 +1361,10 @@ pub trait StreamExt: Stream { Self: Sink + Sized, { let (sink, stream) = split::split(self); - (sink, assert_stream::(stream)) + ( + crate::sink::assert_sink::(sink), + assert_stream::(stream), + ) } /// Do something with each item of this stream, afterwards passing it on. @@ -1459,6 +1467,6 @@ pub trait StreamExt: Stream { where Self: Unpin + FusedStream, { - SelectNextSome::new(self) + assert_future::(SelectNextSome::new(self)) } } diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 6a48a4c4b4..b7353d908a 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -14,7 +14,9 @@ use futures_core::{ use crate::fns::{ InspectOkFn, inspect_ok_fn, InspectErrFn, inspect_err_fn, MapErrFn, map_err_fn, IntoFn, into_fn, MapOkFn, map_ok_fn, }; +use crate::future::assert_future; use crate::stream::{Map, Inspect}; +use crate::stream::assert_stream; mod and_then; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 @@ -135,8 +137,6 @@ mod into_async_read; #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::into_async_read::IntoAsyncRead; -use crate::future::assert_future; -use crate::stream::assert_stream; impl TryStreamExt for S {} @@ -471,7 +471,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture, Self: Sized, { - TryTakeWhile::new(self, f) + assert_stream::, _>(TryTakeWhile::new(self, f)) } /// Attempts to run this stream to completion, executing the provided asynchronous @@ -919,7 +919,7 @@ pub trait TryStreamExt: TryStream { Self::Ok: TryFuture, Self: Sized, { - TryBuffered::new(self, n) + assert_stream::::Ok, Self::Error>, _>(TryBuffered::new(self, n)) } // TODO: false positive warning from rustdoc. Verify once #43466 settles @@ -997,6 +997,6 @@ pub trait TryStreamExt: TryStream { Self: Sized + TryStreamExt + Unpin, Self::Ok: AsRef<[u8]>, { - IntoAsyncRead::new(self) + crate::io::assert_read(IntoAsyncRead::new(self)) } } diff --git a/futures-util/src/stream/try_stream/try_unfold.rs b/futures-util/src/stream/try_stream/try_unfold.rs index c8fc421386..258c18e461 100644 --- a/futures-util/src/stream/try_stream/try_unfold.rs +++ b/futures-util/src/stream/try_stream/try_unfold.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::fmt; use core::pin::Pin; use futures_core::future::TryFuture; @@ -60,11 +61,11 @@ where F: FnMut(T) -> Fut, Fut: TryFuture>, { - TryUnfold { + assert_stream::, _>(TryUnfold { f, state: Some(init), fut: None, - } + }) } pin_project! { diff --git a/futures-util/src/stream/unfold.rs b/futures-util/src/stream/unfold.rs index 473bb67bec..e17d46515c 100644 --- a/futures-util/src/stream/unfold.rs +++ b/futures-util/src/stream/unfold.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use crate::unfold_state::UnfoldState; use core::fmt; use core::pin::Pin; @@ -51,10 +52,10 @@ where F: FnMut(T) -> Fut, Fut: Future>, { - Unfold { + assert_stream::(Unfold { f, state: UnfoldState::Value { value: init }, - } + }) } pin_project! {