From 7bb40e7aca813c7b5d7efcb510cffd9ef59fae13 Mon Sep 17 00:00:00 2001 From: conorbros Date: Wed, 20 Apr 2022 22:54:45 +1000 Subject: [PATCH 1/4] push_front and push_back --- futures-util/src/stream/futures_ordered.rs | 25 ++++++-- futures-util/src/stream/stream/buffered.rs | 2 +- .../src/stream/try_stream/try_buffered.rs | 2 +- futures/tests/stream_futures_ordered.rs | 64 +++++++++++++++++++ 4 files changed, 87 insertions(+), 6 deletions(-) diff --git a/futures-util/src/stream/futures_ordered.rs b/futures-util/src/stream/futures_ordered.rs index f596b3b0e3..d53b0a82c6 100644 --- a/futures-util/src/stream/futures_ordered.rs +++ b/futures-util/src/stream/futures_ordered.rs @@ -129,17 +129,34 @@ impl FuturesOrdered { self.in_progress_queue.is_empty() && self.queued_outputs.is_empty() } - /// Push a future into the queue. + /// Pushes a future to the back of the queue. /// /// This function submits the given future to the internal set for managing. /// This function will not call `poll` on the submitted future. The caller /// must ensure that `FuturesOrdered::poll` is called in order to receive /// task notifications. - pub fn push(&mut self, future: Fut) { + pub fn push_back(&mut self, future: Fut) { let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; self.next_incoming_index += 1; self.in_progress_queue.push(wrapped); } + + /// Pushes a future to the front of the queue. + /// + /// This function submits the given future to the internal set for managing. + /// This function will not call `poll` on the submitted future. The caller + /// must ensure that `FuturesOrdered::poll` is called in order to receive + /// task notifications. This future will be the next future to be returned + /// complete. + pub fn push_front(&mut self, future: Fut) { + if self.next_outgoing_index == 0 { + self.push_back(future) + } else { + let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 }; + self.next_outgoing_index -= 1; + self.in_progress_queue.push(wrapped); + } + } } impl Default for FuturesOrdered { @@ -196,7 +213,7 @@ impl FromIterator for FuturesOrdered { { let acc = Self::new(); iter.into_iter().fold(acc, |mut acc, item| { - acc.push(item); + acc.push_back(item); acc }) } @@ -214,7 +231,7 @@ impl Extend for FuturesOrdered { I: IntoIterator, { for item in iter { - self.push(item); + self.push_back(item); } } } diff --git a/futures-util/src/stream/stream/buffered.rs b/futures-util/src/stream/stream/buffered.rs index 73af847e9c..d1db71c317 100644 --- a/futures-util/src/stream/stream/buffered.rs +++ b/futures-util/src/stream/stream/buffered.rs @@ -69,7 +69,7 @@ where // our queue of futures. while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) { match this.stream.as_mut().poll_next(cx) { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut), Poll::Ready(None) | Poll::Pending => break, } } diff --git a/futures-util/src/stream/try_stream/try_buffered.rs b/futures-util/src/stream/try_stream/try_buffered.rs index 35d47b651a..83e9093fbc 100644 --- a/futures-util/src/stream/try_stream/try_buffered.rs +++ b/futures-util/src/stream/try_stream/try_buffered.rs @@ -55,7 +55,7 @@ where // our queue of futures. Propagate errors from the stream immediately. while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) { match this.stream.as_mut().poll_next(cx)? { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()), + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()), Poll::Ready(None) | Poll::Pending => break, } } diff --git a/futures/tests/stream_futures_ordered.rs b/futures/tests/stream_futures_ordered.rs index 7506c65a63..8b85a3365a 100644 --- a/futures/tests/stream_futures_ordered.rs +++ b/futures/tests/stream_futures_ordered.rs @@ -2,6 +2,7 @@ use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; use futures::future::{self, join, Future, FutureExt, TryFutureExt}; use futures::stream::{FuturesOrdered, StreamExt}; +use futures::task::Poll; use futures_test::task::noop_context; use std::any::Any; @@ -45,6 +46,69 @@ fn works_2() { assert!(stream.poll_next_unpin(&mut cx).is_ready()); } +#[test] +fn test_push_front() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + let (d_tx, d_rx) = oneshot::channel::(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_back(a_rx); + stream.push_back(b_rx); + stream.push_back(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // 1 and 2 should be received in order + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + + stream.push_front(d_rx); + d_tx.send(4).unwrap(); + + // we pushed `d_rx` to the front and sent 4, so we should recieve 4 next + // and then 3 after it + assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); +} + +#[test] +fn test_push_back() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + let (d_tx, d_rx) = oneshot::channel::(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_back(a_rx); + stream.push_back(b_rx); + stream.push_back(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // All results should be received in order + + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + + stream.push_back(d_rx); + d_tx.send(4).unwrap(); + + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx)); +} + #[test] fn from_iterator() { let stream = vec![future::ready::(1), future::ready::(2), future::ready::(3)] From bbfd8ee81697fa829f158b975e4f7e11a7a89c3d Mon Sep 17 00:00:00 2001 From: conorbros Date: Sun, 5 Jun 2022 21:37:09 +1000 Subject: [PATCH 2/4] deprecate push method --- futures-util/src/stream/futures_ordered.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/futures-util/src/stream/futures_ordered.rs b/futures-util/src/stream/futures_ordered.rs index d53b0a82c6..bb103640f6 100644 --- a/futures-util/src/stream/futures_ordered.rs +++ b/futures-util/src/stream/futures_ordered.rs @@ -129,6 +129,19 @@ impl FuturesOrdered { self.in_progress_queue.is_empty() && self.queued_outputs.is_empty() } + /// Push a future into the queue. + /// + /// This function submits the given future to the internal set for managing. + /// This function will not call `poll` on the submitted future. The caller + /// must ensure that `FuturesOrdered::poll` is called in order to receive + /// task notifications. + #[deprecated] + pub fn push(&mut self, future: Fut) { + let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; + self.next_incoming_index += 1; + self.in_progress_queue.push(wrapped); + } + /// Pushes a future to the back of the queue. /// /// This function submits the given future to the internal set for managing. From fc2acbb947ea185e093fb38d212f393cd242d7c8 Mon Sep 17 00:00:00 2001 From: conorbros Date: Sun, 5 Jun 2022 21:55:14 +1000 Subject: [PATCH 3/4] review feedback --- futures-util/src/stream/futures_ordered.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/src/stream/futures_ordered.rs b/futures-util/src/stream/futures_ordered.rs index bb103640f6..e6410e1dc0 100644 --- a/futures-util/src/stream/futures_ordered.rs +++ b/futures-util/src/stream/futures_ordered.rs @@ -135,7 +135,7 @@ impl FuturesOrdered { /// This function will not call `poll` on the submitted future. The caller /// must ensure that `FuturesOrdered::poll` is called in order to receive /// task notifications. - #[deprecated] + #[deprecated(note = "use `push_back` instead")] pub fn push(&mut self, future: Fut) { let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; self.next_incoming_index += 1; From bad0f0871e6a91e8f9609382dff40f6006b3d380 Mon Sep 17 00:00:00 2001 From: conorbros Date: Tue, 5 Jul 2022 16:17:45 +1000 Subject: [PATCH 4/4] review feedback --- futures-util/src/stream/futures_ordered.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/futures-util/src/stream/futures_ordered.rs b/futures-util/src/stream/futures_ordered.rs index e6410e1dc0..f1c93fd683 100644 --- a/futures-util/src/stream/futures_ordered.rs +++ b/futures-util/src/stream/futures_ordered.rs @@ -137,9 +137,7 @@ impl FuturesOrdered { /// task notifications. #[deprecated(note = "use `push_back` instead")] pub fn push(&mut self, future: Fut) { - let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; - self.next_incoming_index += 1; - self.in_progress_queue.push(wrapped); + self.push_back(future); } /// Pushes a future to the back of the queue.