diff --git a/futures-util/src/stream/select_all.rs b/futures-util/src/stream/select_all.rs index d3bb7f33e4..4a35c39091 100644 --- a/futures-util/src/stream/select_all.rs +++ b/futures-util/src/stream/select_all.rs @@ -90,6 +90,11 @@ impl SelectAll { pub fn iter_pin_mut(self: Pin<&mut Self>) -> IterPinMut<'_, StreamFuture> { self.project().inner.iter_pin_mut() } + + /// Clears the set, removing all futures. + pub fn clear(&mut self) { + self.inner.clear() + } } impl Default for SelectAll { diff --git a/futures/tests/stream_select_all.rs b/futures/tests/stream_select_all.rs index 7030cbac4d..70d00a549c 100644 --- a/futures/tests/stream_select_all.rs +++ b/futures/tests/stream_select_all.rs @@ -1,5 +1,5 @@ use futures::channel::mpsc; -use futures::executor::block_on_stream; +use futures::executor::{block_on, block_on_stream}; use futures::future::{self, FutureExt}; use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt}; use futures::task::Poll; @@ -76,3 +76,26 @@ fn works_1() { drop((a_tx, b_tx, c_tx)); assert_eq!(None, stream.next()); } + +#[test] +fn clear() { + let mut tasks = + select_all(vec![stream::iter(vec![1].into_iter()), stream::iter(vec![2].into_iter())]); + + assert_eq!(block_on(tasks.next()), Some(1)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + tasks.push(stream::iter(vec![3].into_iter())); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + assert_eq!(block_on(tasks.next()), None); + assert!(tasks.is_terminated()); + tasks.clear(); + assert!(!tasks.is_terminated()); +}