From 7fb873369c1f5762721dc99a84aa84a470341f4c Mon Sep 17 00:00:00 2001 From: ibraheemdev Date: Thu, 6 May 2021 03:28:06 -0400 Subject: [PATCH 01/15] async test function attr --- futures-macro/src/executor.rs | 27 +++++++++++++++++++++++++++ futures-macro/src/lib.rs | 7 +++++++ futures-test/Cargo.toml | 1 + futures-test/src/lib.rs | 14 ++++++++++++++ 4 files changed, 49 insertions(+) create mode 100644 futures-macro/src/executor.rs diff --git a/futures-macro/src/executor.rs b/futures-macro/src/executor.rs new file mode 100644 index 0000000000..ac008acc7f --- /dev/null +++ b/futures-macro/src/executor.rs @@ -0,0 +1,27 @@ +use proc_macro::TokenStream; +use quote::quote; + +pub(crate) fn test(_: TokenStream, item: TokenStream) -> TokenStream { + let mut input = syn::parse_macro_input!(item as syn::ItemFn); + let attrs = &input.attrs; + let vis = &input.vis; + let sig = &mut input.sig; + let body = &input.block; + + if sig.asyncness.is_none() { + return syn::Error::new_spanned(sig.fn_token, "Only async functions are supported") + .to_compile_error() + .into(); + } + + sig.asyncness = None; + + let gen = quote! { + #(#attrs)* + #vis #sig { + ::futures_test::__private::block_on(async move { #body }) + } + }; + + gen.into() +} diff --git a/futures-macro/src/lib.rs b/futures-macro/src/lib.rs index 98408ebfe6..f3cc774142 100644 --- a/futures-macro/src/lib.rs +++ b/futures-macro/src/lib.rs @@ -14,6 +14,7 @@ extern crate proc_macro; use proc_macro::TokenStream; +mod executor; mod join; mod select; @@ -44,3 +45,9 @@ pub fn select_internal(input: TokenStream) -> TokenStream { pub fn select_biased_internal(input: TokenStream) -> TokenStream { crate::select::select_biased(input) } + +/// The `test` attribute. +#[proc_macro_attribute] +pub fn test_internal(input: TokenStream, item: TokenStream) -> TokenStream { + crate::executor::test(input, item) +} diff --git a/futures-test/Cargo.toml b/futures-test/Cargo.toml index 4835f447b0..0e725f873f 100644 --- a/futures-test/Cargo.toml +++ b/futures-test/Cargo.toml @@ -18,6 +18,7 @@ futures-io = { version = "0.3.14", path = "../futures-io", default-features = fa futures-util = { version = "0.3.14", path = "../futures-util", default-features = false } futures-executor = { version = "0.3.14", path = "../futures-executor", default-features = false } futures-sink = { version = "0.3.14", path = "../futures-sink", default-features = false } +futures-macro = { version = "=0.3.14", path = "../futures-macro", default-features = false } pin-utils = { version = "0.1.0", default-features = false } pin-project = "1.0.1" diff --git a/futures-test/src/lib.rs b/futures-test/src/lib.rs index 4c26a56987..109cc95434 100644 --- a/futures-test/src/lib.rs +++ b/futures-test/src/lib.rs @@ -16,6 +16,7 @@ compile_error!( #[cfg(feature = "std")] pub mod __private { pub use futures_core::{future, stream, task}; + pub use futures_executor::block_on; pub use std::{ option::Option::{None, Some}, pin::Pin, @@ -49,3 +50,16 @@ pub mod io; mod assert_unmoved; mod interleave_pending; mod track_closed; + +/// Enables an `async` test function. The generated future will be run to completion with +/// [`futures_executor::block_on`](futures_executor::block_on). +/// +/// ```no_run +/// #[futures_test::test] +/// async fn my_test() { +/// let fut = async { true }; +/// assert!(fut.await); +/// } +/// ``` +#[cfg(feature = "std")] +pub use futures_macro::test_internal as test; From 1e86c43eeda17ef47b2fbb97b90e2e3b8d4dcb10 Mon Sep 17 00:00:00 2001 From: ibraheemdev Date: Thu, 6 May 2021 03:39:52 -0400 Subject: [PATCH 02/15] don't skip async-test macro doc tests --- futures-test/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-test/src/lib.rs b/futures-test/src/lib.rs index 109cc95434..00cd980903 100644 --- a/futures-test/src/lib.rs +++ b/futures-test/src/lib.rs @@ -54,7 +54,7 @@ mod track_closed; /// Enables an `async` test function. The generated future will be run to completion with /// [`futures_executor::block_on`](futures_executor::block_on). /// -/// ```no_run +/// ``` /// #[futures_test::test] /// async fn my_test() { /// let fut = async { true }; From 99c786eec474b3b79c3cfd9ac541ab65a91bb24c Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Thu, 6 May 2021 11:38:38 -0400 Subject: [PATCH 03/15] Apply suggestions from code review Co-authored-by: Taiki Endo --- futures-macro/src/executor.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/futures-macro/src/executor.rs b/futures-macro/src/executor.rs index ac008acc7f..48f510ffaa 100644 --- a/futures-macro/src/executor.rs +++ b/futures-macro/src/executor.rs @@ -1,25 +1,29 @@ use proc_macro::TokenStream; use quote::quote; -pub(crate) fn test(_: TokenStream, item: TokenStream) -> TokenStream { +pub(crate) fn test(args: TokenStream, item: TokenStream) -> TokenStream { + if !args.is_empty() { + return syn::Error::new_spanned(args, "invalid argument")) + .to_compile_error() + .into(); + } let mut input = syn::parse_macro_input!(item as syn::ItemFn); let attrs = &input.attrs; let vis = &input.vis; let sig = &mut input.sig; let body = &input.block; - if sig.asyncness.is_none() { + if sig.asyncness.take().is_none() { return syn::Error::new_spanned(sig.fn_token, "Only async functions are supported") .to_compile_error() .into(); } - sig.asyncness = None; - let gen = quote! { + #[::core::prelude::v1::test] #(#attrs)* #vis #sig { - ::futures_test::__private::block_on(async move { #body }) + ::futures_test::__private::block_on(async move #body) } }; From b38bffe026fd4f9db71a7a6fe51e953e1ef3ba18 Mon Sep 17 00:00:00 2001 From: ibraheemdev Date: Thu, 6 May 2021 11:57:42 -0400 Subject: [PATCH 04/15] add tests for async-test macro --- futures-macro/src/executor.rs | 3 ++- futures/tests/test_macro.rs | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) create mode 100644 futures/tests/test_macro.rs diff --git a/futures-macro/src/executor.rs b/futures-macro/src/executor.rs index 48f510ffaa..1efb48c7c7 100644 --- a/futures-macro/src/executor.rs +++ b/futures-macro/src/executor.rs @@ -3,10 +3,11 @@ use quote::quote; pub(crate) fn test(args: TokenStream, item: TokenStream) -> TokenStream { if !args.is_empty() { - return syn::Error::new_spanned(args, "invalid argument")) + return syn::Error::new_spanned(proc_macro2::TokenStream::from(args), "invalid argument") .to_compile_error() .into(); } + let mut input = syn::parse_macro_input!(item as syn::ItemFn); let attrs = &input.attrs; let vis = &input.vis; diff --git a/futures/tests/test_macro.rs b/futures/tests/test_macro.rs new file mode 100644 index 0000000000..4b3b44634e --- /dev/null +++ b/futures/tests/test_macro.rs @@ -0,0 +1,18 @@ +#[cfg(test)] +mod tests { + #[futures_test::test] + async fn it_works() { + let fut = async { true }; + assert!(fut.await); + + let fut = async { false }; + assert!(!fut.await); + } + + #[futures_test::test] + #[should_panic] + async fn it_is_being_run() { + let fut = async { false }; + assert!(fut.await); + } +} From f0444cb79ee5c356f5c0614367b7f369b0cb65c8 Mon Sep 17 00:00:00 2001 From: ibraheemdev Date: Fri, 7 May 2021 11:07:23 -0400 Subject: [PATCH 05/15] apply suggestions from code review --- futures-test/src/lib.rs | 12 ++++++++++++ futures/tests/test_macro.rs | 27 ++++++++++++--------------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/futures-test/src/lib.rs b/futures-test/src/lib.rs index 00cd980903..1117bb36cf 100644 --- a/futures-test/src/lib.rs +++ b/futures-test/src/lib.rs @@ -61,5 +61,17 @@ mod track_closed; /// assert!(fut.await); /// } /// ``` +/// +/// This is equivalent to the following code: +/// +/// ``` +/// #[test] +/// fn my_test() { +/// futures::executor::block_on(async move { +/// let fut = async { true }; +/// assert!(fut.await); +/// }) +/// } +/// ``` #[cfg(feature = "std")] pub use futures_macro::test_internal as test; diff --git a/futures/tests/test_macro.rs b/futures/tests/test_macro.rs index 4b3b44634e..2f391997ea 100644 --- a/futures/tests/test_macro.rs +++ b/futures/tests/test_macro.rs @@ -1,18 +1,15 @@ -#[cfg(test)] -mod tests { - #[futures_test::test] - async fn it_works() { - let fut = async { true }; - assert!(fut.await); +#[futures_test::test] +async fn it_works() { + let fut = async { true }; + assert!(fut.await); - let fut = async { false }; - assert!(!fut.await); - } + let fut = async { false }; + assert!(!fut.await); +} - #[futures_test::test] - #[should_panic] - async fn it_is_being_run() { - let fut = async { false }; - assert!(fut.await); - } +#[should_panic] +#[futures_test::test] +async fn it_is_being_run() { + let fut = async { false }; + assert!(fut.await); } From b96e27a35767e40c2da7d63d39cd7392f389eee0 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 8 May 2021 16:34:17 +0900 Subject: [PATCH 06/15] Remove extra parentheses from tools/fmt.sh --- tools/fmt.sh | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/tools/fmt.sh b/tools/fmt.sh index 5438030aee..32f4f00748 100755 --- a/tools/fmt.sh +++ b/tools/fmt.sh @@ -5,7 +5,8 @@ # Usage: # ./tools/fmt.sh # -# This script is needed because `cargo fmt` cannot recognize modules defined inside macros. +# This is similar to `cargo fmt`, but unlike `cargo fmt`, it can recognize +# modules defined inside macros. # Refs: https://github.com/rust-lang/rustfmt/issues/4078 set -euo pipefail @@ -15,12 +16,13 @@ cd "$(cd "$(dirname "${0}")" && pwd)"/.. # shellcheck disable=SC2046 if [[ -z "${CI:-}" ]]; then - ( - # `cargo fmt` cannot recognize modules defined inside macros so run rustfmt directly. - rustfmt $(git ls-files "*.rs") - ) + # `cargo fmt` cannot recognize modules defined inside macros, so run + # rustfmt directly. + # Refs: https://github.com/rust-lang/rustfmt/issues/4078 + rustfmt $(git ls-files "*.rs") else - ( - rustfmt --check $(git ls-files "*.rs") - ) + # `cargo fmt` cannot recognize modules defined inside macros, so run + # rustfmt directly. + # Refs: https://github.com/rust-lang/rustfmt/issues/4078 + rustfmt --check $(git ls-files "*.rs") fi From fdc3aea87682106b001ef2ca098d6182be00e139 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 8 May 2021 17:16:08 +0900 Subject: [PATCH 07/15] Rename Take::limit_ field to limit This workaround is no longer needed. --- futures-util/src/io/take.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/futures-util/src/io/take.rs b/futures-util/src/io/take.rs index 680d2702dc..05830203d0 100644 --- a/futures-util/src/io/take.rs +++ b/futures-util/src/io/take.rs @@ -14,14 +14,13 @@ pin_project! { pub struct Take { #[pin] inner: R, - // Add '_' to avoid conflicts with `limit` method. - limit_: u64, + limit: u64, } } impl Take { pub(super) fn new(inner: R, limit: u64) -> Self { - Self { inner, limit_: limit } + Self { inner, limit } } /// Returns the remaining number of bytes that can be @@ -48,7 +47,7 @@ impl Take { /// # Ok::<(), Box>(()) }).unwrap(); /// ``` pub fn limit(&self) -> u64 { - self.limit_ + self.limit } /// Sets the number of bytes that can be read before this instance will @@ -78,7 +77,7 @@ impl Take { /// # Ok::<(), Box>(()) }).unwrap(); /// ``` pub fn set_limit(&mut self, limit: u64) { - self.limit_ = limit + self.limit = limit } delegate_access_inner!(inner, R, ()); @@ -92,13 +91,13 @@ impl AsyncRead for Take { ) -> Poll> { let this = self.project(); - if *this.limit_ == 0 { + if *this.limit == 0 { return Poll::Ready(Ok(0)); } - let max = cmp::min(buf.len() as u64, *this.limit_) as usize; + let max = cmp::min(buf.len() as u64, *this.limit) as usize; let n = ready!(this.inner.poll_read(cx, &mut buf[..max]))?; - *this.limit_ -= n as u64; + *this.limit -= n as u64; Poll::Ready(Ok(n)) } @@ -113,12 +112,12 @@ impl AsyncBufRead for Take { let this = self.project(); // Don't call into inner reader at all at EOF because it may still block - if *this.limit_ == 0 { + if *this.limit == 0 { return Poll::Ready(Ok(&[])); } let buf = ready!(this.inner.poll_fill_buf(cx)?); - let cap = cmp::min(buf.len() as u64, *this.limit_) as usize; + let cap = cmp::min(buf.len() as u64, *this.limit) as usize; Poll::Ready(Ok(&buf[..cap])) } @@ -126,8 +125,8 @@ impl AsyncBufRead for Take { let this = self.project(); // Don't let callers reset the limit by passing an overlarge value - let amt = cmp::min(amt as u64, *this.limit_) as usize; - *this.limit_ -= amt as u64; + let amt = cmp::min(amt as u64, *this.limit) as usize; + *this.limit -= amt as u64; this.inner.consume(amt); } } From 1c5a021559c4c877265bc6e743eb252b961822f2 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Mon, 10 May 2021 08:30:30 -0400 Subject: [PATCH 08/15] add FuturesUnordered::into_iter, make iter_pin_ref public (#2423) --- .../src/stream/futures_unordered/iter.rs | 46 +++++++++++++++++++ .../src/stream/futures_unordered/mod.rs | 39 ++++++++++++++-- futures/tests/auto_traits.rs | 7 +++ futures/tests/stream_futures_unordered.rs | 45 ++++++++++++++++++ 4 files changed, 134 insertions(+), 3 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index 17cde4fac4..59ea28efa6 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -29,6 +29,47 @@ pub struct IterPinRef<'a, Fut> { /// Immutable iterator over all the futures in the unordered set. pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); +#[derive(Debug)] +/// Owned iterator over all futures in the unordered set. +pub struct IntoIter { + pub(super) len: usize, + pub(super) inner: FuturesUnordered, +} + +impl Iterator for IntoIter { + type Item = Fut; + + fn next(&mut self) -> Option { + // `head_all` can be accessed directly and we don't need to spin on + // `Task::next_all` since we have exclusive access to the set. + let task = self.inner.head_all.get_mut(); + + if (*task).is_null() { + return None; + } + + unsafe { + // Moving out of the future is safe because it is `Unpin` + let future = (*(**task).future.get()).take().unwrap(); + + // Mutable access to a previously shared `FuturesUnordered` implies + // that the other threads already released the object before the + // current thread acquired it, so relaxed ordering can be used and + // valid `next_all` checks can be skipped. + let next = (**task).next_all.load(Relaxed); + *task = next; + self.len -= 1; + Some(future) + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.len, Some(self.len)) + } +} + +impl ExactSizeIterator for IntoIter {} + impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { type Item = Pin<&'a mut Fut>; @@ -36,6 +77,7 @@ impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { if self.task.is_null() { return None; } + unsafe { let future = (*(*self.task).future.get()).as_mut().unwrap(); @@ -78,6 +120,7 @@ impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { if self.task.is_null() { return None; } + unsafe { let future = (*(*self.task).future.get()).as_ref().unwrap(); @@ -120,3 +163,6 @@ unsafe impl Sync for IterPinRef<'_, Fut> {} unsafe impl Send for IterPinMut<'_, Fut> {} unsafe impl Sync for IterPinMut<'_, Fut> {} + +unsafe impl Send for IntoIter {} +unsafe impl Sync for IntoIter {} diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index d1377ff327..89ce113d64 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -22,7 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; mod abort; mod iter; -pub use self::iter::{Iter, IterMut, IterPinMut, IterPinRef}; +pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef}; mod task; use self::task::Task; @@ -194,10 +194,11 @@ impl FuturesUnordered { } /// Returns an iterator that allows inspecting each future in the set. - fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { + pub fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { let (task, len) = self.atomic_load_head_and_len_all(); + let pending_next_all = self.pending_next_all(); - IterPinRef { task, len, pending_next_all: self.pending_next_all(), _marker: PhantomData } + IterPinRef { task, len, pending_next_all, _marker: PhantomData } } /// Returns an iterator that allows modifying each future in the set. @@ -581,6 +582,38 @@ impl Drop for FuturesUnordered { } } +impl<'a, Fut: Unpin> IntoIterator for &'a FuturesUnordered { + type Item = &'a Fut; + type IntoIter = Iter<'a, Fut>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl<'a, Fut: Unpin> IntoIterator for &'a mut FuturesUnordered { + type Item = &'a mut Fut; + type IntoIter = IterMut<'a, Fut>; + + fn into_iter(self) -> Self::IntoIter { + self.iter_mut() + } +} + +impl IntoIterator for FuturesUnordered { + type Item = Fut; + type IntoIter = IntoIter; + + fn into_iter(mut self) -> Self::IntoIter { + // `head_all` can be accessed directly and we don't need to spin on + // `Task::next_all` since we have exclusive access to the set. + let task = *self.head_all.get_mut(); + let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } }; + + IntoIter { len, inner: self } + } +} + impl FromIterator for FuturesUnordered { fn from_iter(iter: I) -> Self where diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index e934595c2a..e0192a118b 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -1827,6 +1827,13 @@ pub mod stream { assert_impl!(futures_unordered::IterPinRef<()>: Sync); assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync); assert_impl!(futures_unordered::IterPinRef: Unpin); + + assert_impl!(futures_unordered::IntoIter<()>: Send); + assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send); + assert_impl!(futures_unordered::IntoIter<()>: Sync); + assert_not_impl!(futures_unordered::IntoIter<*const ()>: Sync); + // The definition of futures_unordered::IntoIter has `Fut: Unpin` bounds. + // assert_not_impl!(futures_unordered::IntoIter: Unpin); } /// Assert Send/Sync/Unpin for all public types in `futures::task`. diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 93b9e293e3..3a5d41853d 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -214,6 +214,51 @@ fn iter_len() { assert!(iter.next().is_none()); } +#[test] +fn into_iter_cancel() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + + let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::>(); + + let stream = stream + .into_iter() + .map(|mut rx| { + rx.close(); + rx + }) + .collect::>(); + + let mut iter = block_on_stream(stream); + + assert!(a_tx.is_canceled()); + assert!(b_tx.is_canceled()); + assert!(c_tx.is_canceled()); + + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), None); +} + +#[test] +fn into_iter_len() { + let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] + .into_iter() + .collect::>(); + + let mut into_iter = stream.into_iter(); + assert_eq!(into_iter.len(), 3); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 2); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 1); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 0); + assert!(into_iter.next().is_none()); +} + #[test] fn futures_not_moved_after_poll() { // Future that will be ready after being polled twice, From 0924ecb1b993585f917fb674c6626381d7fec7cd Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Mon, 10 May 2021 09:13:20 -0400 Subject: [PATCH 09/15] Add FuturesUnordered::clear (#2415) --- .../src/stream/futures_unordered/mod.rs | 30 +++++++++++---- .../futures_unordered/ready_to_run_queue.rs | 37 +++++++++++++------ futures/tests/stream_futures_unordered.rs | 22 +++++++++++ 3 files changed, 69 insertions(+), 20 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 89ce113d64..a25fbe03ef 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -236,7 +236,7 @@ impl FuturesUnordered { (task, len) } - /// Releases the task. It destorys the future inside and either drops + /// Releases the task. It destroys the future inside and either drops /// the `Arc` or transfers ownership to the ready to run queue. /// The task this method is called on must have been unlinked before. fn release_task(&mut self, task: Arc>) { @@ -553,19 +553,33 @@ impl Debug for FuturesUnordered { } } +impl FuturesUnordered { + /// Clears the set, removing all futures. + pub fn clear(&mut self) { + self.clear_head_all(); + + // we just cleared all the tasks, and we have &mut self, so this is safe. + unsafe { self.ready_to_run_queue.clear() }; + + self.is_terminated.store(false, Relaxed); + } + + fn clear_head_all(&mut self) { + while !self.head_all.get_mut().is_null() { + let head = *self.head_all.get_mut(); + let task = unsafe { self.unlink(head) }; + self.release_task(task); + } + } +} + impl Drop for FuturesUnordered { fn drop(&mut self) { // When a `FuturesUnordered` is dropped we want to drop all futures // associated with it. At the same time though there may be tons of // wakers flying around which contain `Task` references // inside them. We'll let those naturally get deallocated. - unsafe { - while !self.head_all.get_mut().is_null() { - let head = *self.head_all.get_mut(); - let task = self.unlink(head); - self.release_task(task); - } - } + self.clear_head_all(); // Note that at this point we could still have a bunch of tasks in the // ready to run queue. None of those tasks, however, have futures diff --git a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs index 3b34dc6e27..5ef6cde83d 100644 --- a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs @@ -85,25 +85,38 @@ impl ReadyToRunQueue { pub(super) fn stub(&self) -> *const Task { &*self.stub } + + // Clear the queue of tasks. + // + // Note that each task has a strong reference count associated with it + // which is owned by the ready to run queue. This method just pulls out + // tasks and drops their refcounts. + // + // # Safety + // + // - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear) + // - The caller **must** guarantee unique access to `self` + pub(crate) unsafe fn clear(&self) { + loop { + // SAFETY: We have the guarantee of mutual exclusion required by `dequeue`. + match self.dequeue() { + Dequeue::Empty => break, + Dequeue::Inconsistent => abort("inconsistent in drop"), + Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), + } + } + } } impl Drop for ReadyToRunQueue { fn drop(&mut self) { // Once we're in the destructor for `Inner` we need to clear out // the ready to run queue of tasks if there's anything left in there. - // - // Note that each task has a strong reference count associated with it - // which is owned by the ready to run queue. All tasks should have had - // their futures dropped already by the `FuturesUnordered` destructor - // above, so we're just pulling out tasks and dropping their refcounts. + + // All tasks have had their futures dropped already by the `FuturesUnordered` + // destructor above, and we have &mut self, so this is safe. unsafe { - loop { - match self.dequeue() { - Dequeue::Empty => break, - Dequeue::Inconsistent => abort("inconsistent in drop"), - Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), - } - } + self.clear(); } } } diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 3a5d41853d..4b9afccaf9 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -345,3 +345,25 @@ fn polled_only_once_at_most_per_iteration() { let mut tasks = FuturesUnordered::::new(); assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx)); } + +#[test] +fn clear() { + let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]); + + assert_eq!(block_on(tasks.next()), Some(1)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + tasks.push(future::ready(3)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + assert_eq!(block_on(tasks.next()), None); + assert!(tasks.is_terminated()); + tasks.clear(); + assert!(!tasks.is_terminated()); +} From 90db30bb829959a18ea0b8fd6a56c17e441ebabf Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Mon, 10 May 2021 09:18:13 -0400 Subject: [PATCH 10/15] Abortable streams (#2410) --- futures-util/src/abortable.rs | 185 +++++++++++++++++++++++++++ futures-util/src/future/abortable.rs | 158 +---------------------- futures-util/src/future/mod.rs | 4 +- futures-util/src/lib.rs | 5 + futures-util/src/stream/abortable.rs | 19 +++ futures-util/src/stream/mod.rs | 7 + futures/tests/future_abortable.rs | 5 + futures/tests/stream_abortable.rs | 46 +++++++ 8 files changed, 274 insertions(+), 155 deletions(-) create mode 100644 futures-util/src/abortable.rs create mode 100644 futures-util/src/stream/abortable.rs create mode 100644 futures/tests/stream_abortable.rs diff --git a/futures-util/src/abortable.rs b/futures-util/src/abortable.rs new file mode 100644 index 0000000000..bb82dd0db8 --- /dev/null +++ b/futures-util/src/abortable.rs @@ -0,0 +1,185 @@ +use crate::task::AtomicWaker; +use alloc::sync::Arc; +use core::fmt; +use core::pin::Pin; +use core::sync::atomic::{AtomicBool, Ordering}; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_core::Stream; +use pin_project_lite::pin_project; + +pin_project! { + /// A future/stream which can be remotely short-circuited using an `AbortHandle`. + #[derive(Debug, Clone)] + #[must_use = "futures/streams do nothing unless you poll them"] + pub struct Abortable { + #[pin] + task: T, + inner: Arc, + } +} + +impl Abortable { + /// Creates a new `Abortable` future/stream using an existing `AbortRegistration`. + /// `AbortRegistration`s can be acquired through `AbortHandle::new`. + /// + /// When `abort` is called on the handle tied to `reg` or if `abort` has + /// already been called, the future/stream will complete immediately without making + /// any further progress. + /// + /// # Examples: + /// + /// Usage with futures: + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::{Abortable, AbortHandle, Aborted}; + /// + /// let (abort_handle, abort_registration) = AbortHandle::new_pair(); + /// let future = Abortable::new(async { 2 }, abort_registration); + /// abort_handle.abort(); + /// assert_eq!(future.await, Err(Aborted)); + /// # }); + /// ``` + /// + /// Usage with streams: + /// + /// ``` + /// # futures::executor::block_on(async { + /// # use futures::future::{Abortable, AbortHandle}; + /// # use futures::stream::{self, StreamExt}; + /// + /// let (abort_handle, abort_registration) = AbortHandle::new_pair(); + /// let mut stream = Abortable::new(stream::iter(vec![1, 2, 3]), abort_registration); + /// abort_handle.abort(); + /// assert_eq!(stream.next().await, None); + /// # }); + /// ``` + pub fn new(task: T, reg: AbortRegistration) -> Self { + Self { task, inner: reg.inner } + } + + /// Checks whether the task has been aborted. Note that all this + /// method indicates is whether [`AbortHandle::abort`] was *called*. + /// This means that it will return `true` even if: + /// * `abort` was called after the task had completed. + /// * `abort` was called while the task was being polled - the task may still be running and + /// will not be stopped until `poll` returns. + pub fn is_aborted(&self) -> bool { + self.inner.aborted.load(Ordering::Relaxed) + } +} + +/// A registration handle for an `Abortable` task. +/// Values of this type can be acquired from `AbortHandle::new` and are used +/// in calls to `Abortable::new`. +#[derive(Debug)] +pub struct AbortRegistration { + inner: Arc, +} + +/// A handle to an `Abortable` task. +#[derive(Debug, Clone)] +pub struct AbortHandle { + inner: Arc, +} + +impl AbortHandle { + /// Creates an (`AbortHandle`, `AbortRegistration`) pair which can be used + /// to abort a running future or stream. + /// + /// This function is usually paired with a call to [`Abortable::new`]. + pub fn new_pair() -> (Self, AbortRegistration) { + let inner = + Arc::new(AbortInner { waker: AtomicWaker::new(), aborted: AtomicBool::new(false) }); + + (Self { inner: inner.clone() }, AbortRegistration { inner }) + } +} + +// Inner type storing the waker to awaken and a bool indicating that it +// should be aborted. +#[derive(Debug)] +struct AbortInner { + waker: AtomicWaker, + aborted: AtomicBool, +} + +/// Indicator that the `Abortable` task was aborted. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct Aborted; + +impl fmt::Display for Aborted { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "`Abortable` future has been aborted") + } +} + +#[cfg(feature = "std")] +impl std::error::Error for Aborted {} + +impl Abortable { + fn try_poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + poll: impl Fn(Pin<&mut T>, &mut Context<'_>) -> Poll, + ) -> Poll> { + // Check if the task has been aborted + if self.is_aborted() { + return Poll::Ready(Err(Aborted)); + } + + // attempt to complete the task + if let Poll::Ready(x) = poll(self.as_mut().project().task, cx) { + return Poll::Ready(Ok(x)); + } + + // Register to receive a wakeup if the task is aborted in the future + self.inner.waker.register(cx.waker()); + + // Check to see if the task was aborted between the first check and + // registration. + // Checking with `is_aborted` which uses `Relaxed` is sufficient because + // `register` introduces an `AcqRel` barrier. + if self.is_aborted() { + return Poll::Ready(Err(Aborted)); + } + + Poll::Pending + } +} + +impl Future for Abortable +where + Fut: Future, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.try_poll(cx, |fut, cx| fut.poll(cx)) + } +} + +impl Stream for Abortable +where + St: Stream, +{ + type Item = St::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.try_poll(cx, |stream, cx| stream.poll_next(cx)).map(Result::ok).map(Option::flatten) + } +} + +impl AbortHandle { + /// Abort the `Abortable` stream/future associated with this handle. + /// + /// Notifies the Abortable task associated with this handle that it + /// should abort. Note that if the task is currently being polled on + /// another thread, it will not immediately stop running. Instead, it will + /// continue to run until its poll method returns. + pub fn abort(&self) { + self.inner.aborted.store(true, Ordering::Relaxed); + self.inner.waker.wake(); + } +} diff --git a/futures-util/src/future/abortable.rs b/futures-util/src/future/abortable.rs index 198cc8e668..d017ab7340 100644 --- a/futures-util/src/future/abortable.rs +++ b/futures-util/src/future/abortable.rs @@ -1,101 +1,8 @@ use super::assert_future; -use crate::task::AtomicWaker; -use alloc::sync::Arc; -use core::fmt; -use core::pin::Pin; -use core::sync::atomic::{AtomicBool, Ordering}; +use crate::future::{AbortHandle, Abortable, Aborted}; use futures_core::future::Future; -use futures_core::task::{Context, Poll}; -use pin_project_lite::pin_project; -pin_project! { - /// A future which can be remotely short-circuited using an `AbortHandle`. - #[derive(Debug, Clone)] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct Abortable { - #[pin] - future: Fut, - inner: Arc, - } -} - -impl Abortable -where - Fut: Future, -{ - /// Creates a new `Abortable` future using an existing `AbortRegistration`. - /// `AbortRegistration`s can be acquired through `AbortHandle::new`. - /// - /// When `abort` is called on the handle tied to `reg` or if `abort` has - /// already been called, the future will complete immediately without making - /// any further progress. - /// - /// Example: - /// - /// ``` - /// # futures::executor::block_on(async { - /// use futures::future::{Abortable, AbortHandle, Aborted}; - /// - /// let (abort_handle, abort_registration) = AbortHandle::new_pair(); - /// let future = Abortable::new(async { 2 }, abort_registration); - /// abort_handle.abort(); - /// assert_eq!(future.await, Err(Aborted)); - /// # }); - /// ``` - pub fn new(future: Fut, reg: AbortRegistration) -> Self { - assert_future::, _>(Self { future, inner: reg.inner }) - } -} - -/// A registration handle for a `Abortable` future. -/// Values of this type can be acquired from `AbortHandle::new` and are used -/// in calls to `Abortable::new`. -#[derive(Debug)] -pub struct AbortRegistration { - inner: Arc, -} - -/// A handle to a `Abortable` future. -#[derive(Debug, Clone)] -pub struct AbortHandle { - inner: Arc, -} - -impl AbortHandle { - /// Creates an (`AbortHandle`, `AbortRegistration`) pair which can be used - /// to abort a running future. - /// - /// This function is usually paired with a call to `Abortable::new`. - /// - /// Example: - /// - /// ``` - /// # futures::executor::block_on(async { - /// use futures::future::{Abortable, AbortHandle, Aborted}; - /// - /// let (abort_handle, abort_registration) = AbortHandle::new_pair(); - /// let future = Abortable::new(async { 2 }, abort_registration); - /// abort_handle.abort(); - /// assert_eq!(future.await, Err(Aborted)); - /// # }); - /// ``` - pub fn new_pair() -> (Self, AbortRegistration) { - let inner = - Arc::new(AbortInner { waker: AtomicWaker::new(), cancel: AtomicBool::new(false) }); - - (Self { inner: inner.clone() }, AbortRegistration { inner }) - } -} - -// Inner type storing the waker to awaken and a bool indicating that it -// should be cancelled. -#[derive(Debug)] -struct AbortInner { - waker: AtomicWaker, - cancel: AtomicBool, -} - -/// Creates a new `Abortable` future and a `AbortHandle` which can be used to stop it. +/// Creates a new `Abortable` future and an `AbortHandle` which can be used to stop it. /// /// This function is a convenient (but less flexible) alternative to calling /// `AbortHandle::new` and `Abortable::new` manually. @@ -107,63 +14,6 @@ where Fut: Future, { let (handle, reg) = AbortHandle::new_pair(); - (Abortable::new(future, reg), handle) -} - -/// Indicator that the `Abortable` future was aborted. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub struct Aborted; - -impl fmt::Display for Aborted { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "`Abortable` future has been aborted") - } -} - -#[cfg(feature = "std")] -impl std::error::Error for Aborted {} - -impl Future for Abortable -where - Fut: Future, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Check if the future has been aborted - if self.inner.cancel.load(Ordering::Relaxed) { - return Poll::Ready(Err(Aborted)); - } - - // attempt to complete the future - if let Poll::Ready(x) = self.as_mut().project().future.poll(cx) { - return Poll::Ready(Ok(x)); - } - - // Register to receive a wakeup if the future is aborted in the... future - self.inner.waker.register(cx.waker()); - - // Check to see if the future was aborted between the first check and - // registration. - // Checking with `Relaxed` is sufficient because `register` introduces an - // `AcqRel` barrier. - if self.inner.cancel.load(Ordering::Relaxed) { - return Poll::Ready(Err(Aborted)); - } - - Poll::Pending - } -} - -impl AbortHandle { - /// Abort the `Abortable` future associated with this handle. - /// - /// Notifies the Abortable future associated with this handle that it - /// should abort. Note that if the future is currently being polled on - /// another thread, it will not immediately stop running. Instead, it will - /// continue to run until its poll method returns. - pub fn abort(&self) { - self.inner.cancel.store(true, Ordering::Relaxed); - self.inner.waker.wake(); - } + let abortable = assert_future::, _>(Abortable::new(future, reg)); + (abortable, handle) } diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 8d9152b6c9..7a63e5ff85 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -112,7 +112,9 @@ cfg_target_has_atomic! { #[cfg(feature = "alloc")] mod abortable; #[cfg(feature = "alloc")] - pub use self::abortable::{abortable, Abortable, AbortHandle, AbortRegistration, Aborted}; + pub use crate::abortable::{Abortable, AbortHandle, AbortRegistration, Aborted}; + #[cfg(feature = "alloc")] + pub use abortable::abortable; } // Just a helper function to ensure the futures we're returning all have the diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index 76d6ca7666..16871cb4b5 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -336,5 +336,10 @@ pub use crate::io::{ #[cfg(feature = "alloc")] pub mod lock; +cfg_target_has_atomic! { + #[cfg(feature = "alloc")] + mod abortable; +} + mod fns; mod unfold_state; diff --git a/futures-util/src/stream/abortable.rs b/futures-util/src/stream/abortable.rs new file mode 100644 index 0000000000..1fea895822 --- /dev/null +++ b/futures-util/src/stream/abortable.rs @@ -0,0 +1,19 @@ +use super::assert_stream; +use crate::stream::{AbortHandle, Abortable}; +use crate::Stream; + +/// Creates a new `Abortable` stream and an `AbortHandle` which can be used to stop it. +/// +/// This function is a convenient (but less flexible) alternative to calling +/// `AbortHandle::new` and `Abortable::new` manually. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +pub fn abortable(stream: St) -> (Abortable, AbortHandle) +where + St: Stream, +{ + let (handle, reg) = AbortHandle::new_pair(); + let abortable = assert_stream::(Abortable::new(stream, reg)); + (abortable, handle) +} diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index ec6c68bbf3..2eac9711a1 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -107,6 +107,13 @@ cfg_target_has_atomic! { mod select_all; #[cfg(feature = "alloc")] pub use self::select_all::{select_all, SelectAll}; + + #[cfg(feature = "alloc")] + mod abortable; + #[cfg(feature = "alloc")] + pub use crate::abortable::{Abortable, AbortHandle, AbortRegistration, Aborted}; + #[cfg(feature = "alloc")] + pub use abortable::abortable; } // Just a helper function to ensure the streams we're returning all have the diff --git a/futures/tests/future_abortable.rs b/futures/tests/future_abortable.rs index 5925c9a27b..e119f0b719 100644 --- a/futures/tests/future_abortable.rs +++ b/futures/tests/future_abortable.rs @@ -10,6 +10,7 @@ fn abortable_works() { let (abortable_rx, abort_handle) = abortable(a_rx); abort_handle.abort(); + assert!(abortable_rx.is_aborted()); assert_eq!(Err(Aborted), block_on(abortable_rx)); } @@ -20,11 +21,14 @@ fn abortable_awakens() { let (waker, counter) = new_count_waker(); let mut cx = Context::from_waker(&waker); + assert_eq!(counter, 0); assert_eq!(Poll::Pending, abortable_rx.poll_unpin(&mut cx)); assert_eq!(counter, 0); + abort_handle.abort(); assert_eq!(counter, 1); + assert!(abortable_rx.is_aborted()); assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(&mut cx)); } @@ -35,5 +39,6 @@ fn abortable_resolves() { tx.send(()).unwrap(); + assert!(!abortable_rx.is_aborted()); assert_eq!(Ok(Ok(())), block_on(abortable_rx)); } diff --git a/futures/tests/stream_abortable.rs b/futures/tests/stream_abortable.rs new file mode 100644 index 0000000000..2339dd0522 --- /dev/null +++ b/futures/tests/stream_abortable.rs @@ -0,0 +1,46 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::stream::{abortable, Stream, StreamExt}; +use futures::task::{Context, Poll}; +use futures::SinkExt; +use futures_test::task::new_count_waker; +use std::pin::Pin; + +#[test] +fn abortable_works() { + let (_tx, a_rx) = mpsc::channel::<()>(1); + let (mut abortable_rx, abort_handle) = abortable(a_rx); + + abort_handle.abort(); + assert!(abortable_rx.is_aborted()); + assert_eq!(None, block_on(abortable_rx.next())); +} + +#[test] +fn abortable_awakens() { + let (_tx, a_rx) = mpsc::channel::<()>(1); + let (mut abortable_rx, abort_handle) = abortable(a_rx); + + let (waker, counter) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + + assert_eq!(counter, 0); + assert_eq!(Poll::Pending, Pin::new(&mut abortable_rx).poll_next(&mut cx)); + assert_eq!(counter, 0); + + abort_handle.abort(); + assert_eq!(counter, 1); + assert!(abortable_rx.is_aborted()); + assert_eq!(Poll::Ready(None), Pin::new(&mut abortable_rx).poll_next(&mut cx)); +} + +#[test] +fn abortable_resolves() { + let (mut tx, a_rx) = mpsc::channel::<()>(1); + let (mut abortable_rx, _abort_handle) = abortable(a_rx); + + block_on(tx.send(())).unwrap(); + + assert!(!abortable_rx.is_aborted()); + assert_eq!(Some(()), block_on(abortable_rx.next())); +} From 838570c01c149317e89f4453441bcdc9209ed53d Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Mon, 10 May 2021 09:45:04 -0400 Subject: [PATCH 11/15] expose iterators from SelectAll (#2428) --- futures-util/src/stream/select_all.rs | 83 ++++++++++++++++++++++----- 1 file changed, 68 insertions(+), 15 deletions(-) diff --git a/futures-util/src/stream/select_all.rs b/futures-util/src/stream/select_all.rs index 6b17bad125..e9ab519e5e 100644 --- a/futures-util/src/stream/select_all.rs +++ b/futures-util/src/stream/select_all.rs @@ -8,24 +8,30 @@ use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + use super::assert_stream; +use crate::stream::futures_unordered::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef}; use crate::stream::{FuturesUnordered, StreamExt, StreamFuture}; -/// An unbounded set of streams -/// -/// This "combinator" provides the ability to maintain a set of streams -/// and drive them all to completion. -/// -/// Streams are pushed into this set and their realized values are -/// yielded as they become ready. Streams will only be polled when they -/// generate notifications. This allows to coordinate a large number of streams. -/// -/// Note that you can create a ready-made `SelectAll` via the -/// `select_all` function in the `stream` module, or you can start with an -/// empty set with the `SelectAll::new` constructor. -#[must_use = "streams do nothing unless polled"] -pub struct SelectAll { - inner: FuturesUnordered>, +pin_project! { + /// An unbounded set of streams + /// + /// This "combinator" provides the ability to maintain a set of streams + /// and drive them all to completion. + /// + /// Streams are pushed into this set and their realized values are + /// yielded as they become ready. Streams will only be polled when they + /// generate notifications. This allows to coordinate a large number of streams. + /// + /// Note that you can create a ready-made `SelectAll` via the + /// `select_all` function in the `stream` module, or you can start with an + /// empty set with the `SelectAll::new` constructor. + #[must_use = "streams do nothing unless polled"] + pub struct SelectAll { + #[pin] + inner: FuturesUnordered>, + } } impl Debug for SelectAll { @@ -64,6 +70,26 @@ impl SelectAll { pub fn push(&mut self, stream: St) { self.inner.push(stream.into_future()); } + + /// Returns an iterator that allows inspecting each future in the set. + pub fn iter(&self) -> Iter<'_, StreamFuture> { + self.inner.iter() + } + + /// Returns an iterator that allows inspecting each future in the set. + pub fn iter_pin_ref(self: Pin<&'_ Self>) -> IterPinRef<'_, StreamFuture> { + self.project_ref().inner.iter_pin_ref() + } + + /// Returns an iterator that allows modifying each future in the set. + pub fn iter_mut(&mut self) -> IterMut<'_, StreamFuture> { + self.inner.iter_mut() + } + + /// Returns an iterator that allows modifying each future in the set. + pub fn iter_pin_mut(self: Pin<&mut Self>) -> IterPinMut<'_, StreamFuture> { + self.project().inner.iter_pin_mut() + } } impl Default for SelectAll { @@ -139,3 +165,30 @@ impl Extend for SelectAll { } } } + +impl IntoIterator for SelectAll { + type Item = StreamFuture; + type IntoIter = IntoIter>; + + fn into_iter(self) -> Self::IntoIter { + self.inner.into_iter() + } +} + +impl<'a, St: Stream + Unpin> IntoIterator for &'a SelectAll { + type Item = &'a StreamFuture; + type IntoIter = Iter<'a, StreamFuture>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl<'a, St: Stream + Unpin> IntoIterator for &'a mut SelectAll { + type Item = &'a mut StreamFuture; + type IntoIter = IterMut<'a, StreamFuture>; + + fn into_iter(self) -> Self::IntoIter { + self.iter_mut() + } +} From efac9598fc350f746f798b54c05a19ec30780cf2 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Mon, 10 May 2021 10:43:50 -0400 Subject: [PATCH 12/15] Add SelectAll::clear (#2430) --- futures-util/src/stream/select_all.rs | 5 +++++ futures/tests/stream_select_all.rs | 25 ++++++++++++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/futures-util/src/stream/select_all.rs b/futures-util/src/stream/select_all.rs index e9ab519e5e..145099cb29 100644 --- a/futures-util/src/stream/select_all.rs +++ b/futures-util/src/stream/select_all.rs @@ -90,6 +90,11 @@ impl SelectAll { pub fn iter_pin_mut(self: Pin<&mut Self>) -> IterPinMut<'_, StreamFuture> { self.project().inner.iter_pin_mut() } + + /// Clears the set, removing all futures. + pub fn clear(&mut self) { + self.inner.clear() + } } impl Default for SelectAll { diff --git a/futures/tests/stream_select_all.rs b/futures/tests/stream_select_all.rs index eb711dda0c..e7e3976398 100644 --- a/futures/tests/stream_select_all.rs +++ b/futures/tests/stream_select_all.rs @@ -1,5 +1,5 @@ use futures::channel::mpsc; -use futures::executor::block_on_stream; +use futures::executor::{block_on, block_on_stream}; use futures::future::{self, FutureExt}; use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt}; use futures::task::Poll; @@ -76,3 +76,26 @@ fn works_1() { drop((a_tx, b_tx, c_tx)); assert_eq!(None, stream.next()); } + +#[test] +fn clear() { + let mut tasks = + select_all(vec![stream::iter(vec![1].into_iter()), stream::iter(vec![2].into_iter())]); + + assert_eq!(block_on(tasks.next()), Some(1)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + tasks.push(stream::iter(vec![3].into_iter())); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + assert_eq!(block_on(tasks.next()), None); + assert!(tasks.is_terminated()); + tasks.clear(); + assert!(!tasks.is_terminated()); +} From 91385742d3fd43e74cd26900f16ba7b0e951e83d Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Tue, 11 May 2021 00:25:15 +0900 Subject: [PATCH 13/15] Change SelectAll iterators to return stream instead of StreamFuture (#2431) --- .../src/stream/futures_unordered/iter.rs | 20 ++-- futures-util/src/stream/mod.rs | 2 +- futures-util/src/stream/select_all.rs | 109 +++++++++++++----- futures/tests/stream_select_all.rs | 96 +++++++++++++++ 4 files changed, 189 insertions(+), 38 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index 59ea28efa6..04db5ee753 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -4,20 +4,20 @@ use core::marker::PhantomData; use core::pin::Pin; use core::sync::atomic::Ordering::Relaxed; -#[derive(Debug)] /// Mutable iterator over all futures in the unordered set. +#[derive(Debug)] pub struct IterPinMut<'a, Fut> { pub(super) task: *const Task, pub(super) len: usize, pub(super) _marker: PhantomData<&'a mut FuturesUnordered>, } -#[derive(Debug)] /// Mutable iterator over all futures in the unordered set. +#[derive(Debug)] pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>); -#[derive(Debug)] /// Immutable iterator over all futures in the unordered set. +#[derive(Debug)] pub struct IterPinRef<'a, Fut> { pub(super) task: *const Task, pub(super) len: usize, @@ -25,12 +25,12 @@ pub struct IterPinRef<'a, Fut> { pub(super) _marker: PhantomData<&'a FuturesUnordered>, } -#[derive(Debug)] /// Immutable iterator over all the futures in the unordered set. +#[derive(Debug)] pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); -#[derive(Debug)] /// Owned iterator over all futures in the unordered set. +#[derive(Debug)] pub struct IntoIter { pub(super) len: usize, pub(super) inner: FuturesUnordered, @@ -39,7 +39,7 @@ pub struct IntoIter { impl Iterator for IntoIter { type Item = Fut; - fn next(&mut self) -> Option { + fn next(&mut self) -> Option { // `head_all` can be accessed directly and we don't need to spin on // `Task::next_all` since we have exclusive access to the set. let task = self.inner.head_all.get_mut(); @@ -73,7 +73,7 @@ impl ExactSizeIterator for IntoIter {} impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { type Item = Pin<&'a mut Fut>; - fn next(&mut self) -> Option> { + fn next(&mut self) -> Option { if self.task.is_null() { return None; } @@ -102,7 +102,7 @@ impl ExactSizeIterator for IterPinMut<'_, Fut> {} impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> { type Item = &'a mut Fut; - fn next(&mut self) -> Option<&'a mut Fut> { + fn next(&mut self) -> Option { self.0.next().map(Pin::get_mut) } @@ -116,7 +116,7 @@ impl ExactSizeIterator for IterMut<'_, Fut> {} impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { type Item = Pin<&'a Fut>; - fn next(&mut self) -> Option> { + fn next(&mut self) -> Option { if self.task.is_null() { return None; } @@ -145,7 +145,7 @@ impl ExactSizeIterator for IterPinRef<'_, Fut> {} impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { type Item = &'a Fut; - fn next(&mut self) -> Option<&'a Fut> { + fn next(&mut self) -> Option { self.0.next().map(Pin::get_ref) } diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 2eac9711a1..0b2fc90532 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -104,7 +104,7 @@ cfg_target_has_atomic! { pub use self::futures_unordered::FuturesUnordered; #[cfg(feature = "alloc")] - mod select_all; + pub mod select_all; #[cfg(feature = "alloc")] pub use self::select_all::{select_all, SelectAll}; diff --git a/futures-util/src/stream/select_all.rs b/futures-util/src/stream/select_all.rs index 145099cb29..3474331adc 100644 --- a/futures-util/src/stream/select_all.rs +++ b/futures-util/src/stream/select_all.rs @@ -11,8 +11,7 @@ use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; use super::assert_stream; -use crate::stream::futures_unordered::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef}; -use crate::stream::{FuturesUnordered, StreamExt, StreamFuture}; +use crate::stream::{futures_unordered, FuturesUnordered, StreamExt, StreamFuture}; pin_project! { /// An unbounded set of streams @@ -71,27 +70,17 @@ impl SelectAll { self.inner.push(stream.into_future()); } - /// Returns an iterator that allows inspecting each future in the set. - pub fn iter(&self) -> Iter<'_, StreamFuture> { - self.inner.iter() + /// Returns an iterator that allows inspecting each stream in the set. + pub fn iter(&self) -> Iter<'_, St> { + Iter(self.inner.iter()) } - /// Returns an iterator that allows inspecting each future in the set. - pub fn iter_pin_ref(self: Pin<&'_ Self>) -> IterPinRef<'_, StreamFuture> { - self.project_ref().inner.iter_pin_ref() + /// Returns an iterator that allows modifying each stream in the set. + pub fn iter_mut(&mut self) -> IterMut<'_, St> { + IterMut(self.inner.iter_mut()) } - /// Returns an iterator that allows modifying each future in the set. - pub fn iter_mut(&mut self) -> IterMut<'_, StreamFuture> { - self.inner.iter_mut() - } - - /// Returns an iterator that allows modifying each future in the set. - pub fn iter_pin_mut(self: Pin<&mut Self>) -> IterPinMut<'_, StreamFuture> { - self.project().inner.iter_pin_mut() - } - - /// Clears the set, removing all futures. + /// Clears the set, removing all streams. pub fn clear(&mut self) { self.inner.clear() } @@ -139,7 +128,7 @@ impl FusedStream for SelectAll { /// streams internally, in the order they become available. /// /// Note that the returned set can also be used to dynamically push more -/// futures into the set as they become available. +/// streams into the set as they become available. /// /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. @@ -172,17 +161,17 @@ impl Extend for SelectAll { } impl IntoIterator for SelectAll { - type Item = StreamFuture; - type IntoIter = IntoIter>; + type Item = St; + type IntoIter = IntoIter; fn into_iter(self) -> Self::IntoIter { - self.inner.into_iter() + IntoIter(self.inner.into_iter()) } } impl<'a, St: Stream + Unpin> IntoIterator for &'a SelectAll { - type Item = &'a StreamFuture; - type IntoIter = Iter<'a, StreamFuture>; + type Item = &'a St; + type IntoIter = Iter<'a, St>; fn into_iter(self) -> Self::IntoIter { self.iter() @@ -190,10 +179,76 @@ impl<'a, St: Stream + Unpin> IntoIterator for &'a SelectAll { } impl<'a, St: Stream + Unpin> IntoIterator for &'a mut SelectAll { - type Item = &'a mut StreamFuture; - type IntoIter = IterMut<'a, StreamFuture>; + type Item = &'a mut St; + type IntoIter = IterMut<'a, St>; fn into_iter(self) -> Self::IntoIter { self.iter_mut() } } + +/// Immutable iterator over all streams in the unordered set. +#[derive(Debug)] +pub struct Iter<'a, St: Unpin>(futures_unordered::Iter<'a, StreamFuture>); + +/// Mutable iterator over all streams in the unordered set. +#[derive(Debug)] +pub struct IterMut<'a, St: Unpin>(futures_unordered::IterMut<'a, StreamFuture>); + +/// Owned iterator over all streams in the unordered set. +#[derive(Debug)] +pub struct IntoIter(futures_unordered::IntoIter>); + +impl<'a, St: Stream + Unpin> Iterator for Iter<'a, St> { + type Item = &'a St; + + fn next(&mut self) -> Option { + let st = self.0.next()?; + let next = st.get_ref(); + // This should always be true because FuturesUnordered removes completed futures. + debug_assert!(next.is_some()); + next + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl ExactSizeIterator for Iter<'_, St> {} + +impl<'a, St: Stream + Unpin> Iterator for IterMut<'a, St> { + type Item = &'a mut St; + + fn next(&mut self) -> Option { + let st = self.0.next()?; + let next = st.get_mut(); + // This should always be true because FuturesUnordered removes completed futures. + debug_assert!(next.is_some()); + next + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl ExactSizeIterator for IterMut<'_, St> {} + +impl Iterator for IntoIter { + type Item = St; + + fn next(&mut self) -> Option { + let st = self.0.next()?; + let next = st.into_inner(); + // This should always be true because FuturesUnordered removes completed futures. + debug_assert!(next.is_some()); + next + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl ExactSizeIterator for IntoIter {} diff --git a/futures/tests/stream_select_all.rs b/futures/tests/stream_select_all.rs index e7e3976398..4ae0735762 100644 --- a/futures/tests/stream_select_all.rs +++ b/futures/tests/stream_select_all.rs @@ -99,3 +99,99 @@ fn clear() { tasks.clear(); assert!(!tasks.is_terminated()); } + +#[test] +fn iter_mut() { + let mut stream = + vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::>(); + + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] + .into_iter() + .collect::>(); + + assert_eq!(stream.len(), 3); + assert_eq!(block_on(stream.next()), Some(1)); + assert_eq!(stream.len(), 2); + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + assert_eq!(block_on(stream.next()), Some(2)); + assert_eq!(stream.len(), 2); + assert_eq!(block_on(stream.next()), None); + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} + +#[test] +fn iter() { + let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::>(); + + let mut iter = stream.iter(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] + .into_iter() + .collect::>(); + + assert_eq!(stream.len(), 3); + assert_eq!(block_on(stream.next()), Some(1)); + assert_eq!(stream.len(), 2); + let mut iter = stream.iter(); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + assert_eq!(block_on(stream.next()), Some(2)); + assert_eq!(stream.len(), 2); + assert_eq!(block_on(stream.next()), None); + let mut iter = stream.iter(); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} + +#[test] +fn into_iter() { + let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::>(); + + let mut iter = stream.into_iter(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} From 94abb0ad52ee1c4daf061eb47ba16bc211810055 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Tue, 11 May 2021 01:27:56 +0900 Subject: [PATCH 14/15] futures-macro: improve diagnostics on type mismatch (#2433) --- futures-macro/Cargo.toml | 2 +- futures-macro/src/executor.rs | 45 ++++++++++++++++++++++++++--------- futures/tests/test_macro.rs | 5 ++++ 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/futures-macro/Cargo.toml b/futures-macro/Cargo.toml index 9002a9f391..bb40701665 100644 --- a/futures-macro/Cargo.toml +++ b/futures-macro/Cargo.toml @@ -23,4 +23,4 @@ autocfg = "1" proc-macro2 = "1.0" proc-macro-hack = "0.5.19" quote = "1.0" -syn = { version = "1.0", features = ["full"] } +syn = { version = "1.0.56", features = ["full"] } diff --git a/futures-macro/src/executor.rs b/futures-macro/src/executor.rs index 1efb48c7c7..40a091f94c 100644 --- a/futures-macro/src/executor.rs +++ b/futures-macro/src/executor.rs @@ -1,5 +1,6 @@ use proc_macro::TokenStream; -use quote::quote; +use proc_macro2::Span; +use quote::{quote, quote_spanned, ToTokens}; pub(crate) fn test(args: TokenStream, item: TokenStream) -> TokenStream { if !args.is_empty() { @@ -9,23 +10,45 @@ pub(crate) fn test(args: TokenStream, item: TokenStream) -> TokenStream { } let mut input = syn::parse_macro_input!(item as syn::ItemFn); - let attrs = &input.attrs; - let vis = &input.vis; - let sig = &mut input.sig; - let body = &input.block; - if sig.asyncness.take().is_none() { - return syn::Error::new_spanned(sig.fn_token, "Only async functions are supported") + if input.sig.asyncness.take().is_none() { + return syn::Error::new_spanned(input.sig.fn_token, "Only async functions are supported") .to_compile_error() .into(); } + // If type mismatch occurs, the current rustc points to the last statement. + let (last_stmt_start_span, last_stmt_end_span) = { + let mut last_stmt = input + .block + .stmts + .last() + .map(ToTokens::into_token_stream) + .unwrap_or_default() + .into_iter(); + // `Span` on stable Rust has a limitation that only points to the first + // token, not the whole tokens. We can work around this limitation by + // using the first/last span of the tokens like + // `syn::Error::new_spanned` does. + let start = last_stmt.next().map_or_else(Span::call_site, |t| t.span()); + let end = last_stmt.last().map_or(start, |t| t.span()); + (start, end) + }; + + let path = quote_spanned! {last_stmt_start_span=> + ::futures_test::__private + }; + let body = &input.block; + input.block.stmts = vec![syn::Stmt::Expr( + syn::parse2(quote_spanned! {last_stmt_end_span=> + #path::block_on(async #body) + }) + .unwrap(), + )]; + let gen = quote! { #[::core::prelude::v1::test] - #(#attrs)* - #vis #sig { - ::futures_test::__private::block_on(async move #body) - } + #input }; gen.into() diff --git a/futures/tests/test_macro.rs b/futures/tests/test_macro.rs index 2f391997ea..6adf51d8bb 100644 --- a/futures/tests/test_macro.rs +++ b/futures/tests/test_macro.rs @@ -13,3 +13,8 @@ async fn it_is_being_run() { let fut = async { false }; assert!(fut.await); } + +#[futures_test::test] +async fn return_ty() -> Result<(), ()> { + Ok(()) +} From ce515433484804bf41ad5b7dc26f318e5fcdbd8b Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Tue, 11 May 2021 02:14:26 +0900 Subject: [PATCH 15/15] Remove extra text from test attribute docs (#2435) --- futures-macro/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/futures-macro/src/lib.rs b/futures-macro/src/lib.rs index f3cc774142..8835de4fa8 100644 --- a/futures-macro/src/lib.rs +++ b/futures-macro/src/lib.rs @@ -46,7 +46,8 @@ pub fn select_biased_internal(input: TokenStream) -> TokenStream { crate::select::select_biased(input) } -/// The `test` attribute. +// TODO: Change this to doc comment once rustdoc bug fixed. +// The `test` attribute. #[proc_macro_attribute] pub fn test_internal(input: TokenStream, item: TokenStream) -> TokenStream { crate::executor::test(input, item)