Skip to content

Commit

Permalink
Misc tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn committed Jan 18, 2022
1 parent 952ccf7 commit 12bee98
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 111 deletions.
67 changes: 35 additions & 32 deletions futures-util/src/stream/stream/flatten_unordered.rs
Expand Up @@ -21,30 +21,30 @@ use futures_task::{waker, ArcWake};

use crate::stream::FuturesUnordered;

/// Indicates that there is nothing to poll and stream isn't being polled at
/// the moment.
/// There is nothing to poll and stream isn't being
/// polled or waking at the moment.
const NONE: u8 = 0;

/// Indicates that inner streams need to be polled.
/// Inner streams need to be polled.
const NEED_TO_POLL_INNER_STREAMS: u8 = 1;

/// Indicates that stream needs to be polled.
/// The base stream needs to be polled.
const NEED_TO_POLL_STREAM: u8 = 0b10;

/// Indicates that it needs to poll stream and inner streams.
/// It needs to poll base stream and inner streams.
const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM;

/// Indicates that the current stream is being polled at the moment.
/// The current stream is being polled at the moment.
const POLLING: u8 = 0b100;

/// Indicates that inner streams are being waked at the moment.
/// Inner streams are being woken at the moment.
const WAKING_INNER_STREAMS: u8 = 0b1000;

/// Indicates that the current stream is being waked at the moment.
/// The base stream is being woken at the moment.
const WAKING_STREAM: u8 = 0b10000;

/// Indicates that the current stream or inner streams are being waked at the moment.
const WAKING_ANYTHING: u8 = WAKING_STREAM | WAKING_INNER_STREAMS;
/// The base stream or inner streams are being woken at the moment.
const WAKING_ALL: u8 = WAKING_STREAM | WAKING_INNER_STREAMS;

/// Determines what needs to be polled, and is stream being polled at the
/// moment or not.
Expand All @@ -64,7 +64,7 @@ impl SharedPollState {
fn start_polling(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState)>)> {
self.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
if value & WAKING_ANYTHING == NONE {
if value & WAKING_ALL == NONE {
Some(POLLING)
} else {
None
Expand Down Expand Up @@ -189,11 +189,14 @@ impl ArcWake for InnerWaker {
}

pin_project! {
/// Future which contains optional stream. If it's `Some`, it will attempt
/// to call `poll_next` on it, returning `Some((item, next_item_fut))` in
/// case of `Poll::Ready(Some(...))` or `None` in case of `Poll::Ready(None)`.
/// If `poll_next` will return `Poll::Pending`, it will be forwared to
/// the future, and current task will be notified by waker.
/// Future which contains optional stream.
///
/// If it's `Some`, it will attempt to call `poll_next` on it,
/// returning `Some((item, next_item_fut))` in case of `Poll::Ready(Some(...))`
/// or `None` in case of `Poll::Ready(None)`.
///
/// If `poll_next` will return `Poll::Pending`, it will be forwarded to
/// the future and current task will be notified by waker.
#[must_use = "futures do nothing unless you `.await` or poll them"]
struct PollStreamFut<St> {
#[pin]
Expand All @@ -213,12 +216,14 @@ impl<St: Stream + Unpin> Future for PollStreamFut<St> {

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut stream = self.project().stream;

let item = if let Some(stream) = stream.as_mut().as_pin_mut() {
ready!(stream.poll_next(cx))
} else {
None
};
let out = item.map(|item| (item, PollStreamFut::new(stream.get_mut().take())));
let next_item_fut = PollStreamFut::new(stream.get_mut().take());
let out = item.map(|item| (item, next_item_fut));

Poll::Ready(out)
}
Expand All @@ -229,9 +234,9 @@ pin_project! {
/// method.
#[project = FlattenUnorderedProj]
#[must_use = "streams do nothing unless polled"]
pub struct FlattenUnordered<St, U> {
pub struct FlattenUnordered<St> where St: Stream {
#[pin]
inner_streams: FuturesUnordered<PollStreamFut<U>>,
inner_streams: FuturesUnordered<PollStreamFut<St::Item>>,
#[pin]
stream: St,
poll_state: SharedPollState,
Expand All @@ -242,7 +247,7 @@ pin_project! {
}
}

impl<St> fmt::Debug for FlattenUnordered<St, St::Item>
impl<St> fmt::Debug for FlattenUnordered<St>
where
St: Stream + fmt::Debug,
St::Item: Stream + fmt::Debug,
Expand All @@ -258,12 +263,12 @@ where
}
}

impl<St> FlattenUnordered<St, St::Item>
impl<St> FlattenUnordered<St>
where
St: Stream,
St::Item: Stream + Unpin,
{
pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St, St::Item> {
pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St> {
let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM);

FlattenUnordered {
Expand All @@ -288,7 +293,7 @@ where
delegate_access_inner!(stream, St, ());
}

impl<St> FlattenUnorderedProj<'_, St, St::Item>
impl<St> FlattenUnorderedProj<'_, St>
where
St: Stream,
{
Expand All @@ -298,22 +303,20 @@ where
}
}

impl<St> FusedStream for FlattenUnordered<St, St::Item>
impl<St> FusedStream for FlattenUnordered<St>
where
St: FusedStream,
St::Item: FusedStream + Unpin,
<St::Item as Stream>::Item: core::fmt::Debug,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.inner_streams.is_empty()
}
}

impl<St> Stream for FlattenUnordered<St, St::Item>
impl<St> Stream for FlattenUnordered<St>
where
St: Stream,
St::Item: Stream + Unpin,
<St::Item as Stream>::Item: core::fmt::Debug,
{
type Item = <St::Item as Stream>::Item;

Expand All @@ -335,7 +338,7 @@ where
// Safety: now state is `POLLING`.
let stream_waker = unsafe { InnerWaker::replace_waker(this.stream_waker, cx) };

// Here we need to poll the inner stream.
// Here we need to poll the base stream.
//
// To improve performance, we will attempt to place as many items as we can
// to the `FuturesUnordered` bucket before polling inner streams
Expand Down Expand Up @@ -420,12 +423,12 @@ where

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for FlattenUnordered<S, S::Item>
impl<St, Item> Sink<Item> for FlattenUnordered<St>
where
S: Stream + Sink<Item>,
S::Item: Stream,
St: Stream + Sink<Item>,
St::Item: Stream,
{
type Error = S::Error;
type Error = St::Error;

delegate_sink!(stream, Item);
}
9 changes: 2 additions & 7 deletions futures-util/src/stream/stream/mod.rs
Expand Up @@ -205,13 +205,8 @@ mod flatten_unordered;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
delegate_all!(
/// Stream for the [`flatten_unordered`](StreamExt::flatten_unordered) method.
FlattenUnordered<St>(
flatten_unordered::FlattenUnordered<St, St::Item>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, limit: Option<usize>| flatten_unordered::FlattenUnordered::new(x, limit)]
where St: Stream, St::Item: Stream, St::Item: Unpin
);
#[allow(unreachable_pub)]
pub use self::flatten_unordered::FlattenUnordered;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
Expand Down
156 changes: 84 additions & 72 deletions futures/tests/stream.rs
Expand Up @@ -147,100 +147,112 @@ fn flatten_unordered() {
}

// basic behaviour
block_on(async {
let st =
stream::iter(vec![stream::iter(0..=4u8), stream::iter(6..=10), stream::iter(10..=12)]);
{
block_on(async {
let st = stream::iter(vec![
stream::iter(0..=4u8),
stream::iter(6..=10),
stream::iter(10..=12),
]);

let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await;
let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await;

assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]);
});
assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]);
});

block_on(async {
let st =
stream::iter(vec![stream::iter(0..=4u8), stream::iter(6..=10), stream::iter(0..=2)]);
block_on(async {
let st = stream::iter(vec![
stream::iter(0..=4u8),
stream::iter(6..=10),
stream::iter(0..=2),
]);

let mut fm_unordered = st
.flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0)))
.collect::<Vec<_>>()
.await;
let mut fm_unordered = st
.flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0)))
.collect::<Vec<_>>()
.await;

fm_unordered.sort_unstable();
fm_unordered.sort_unstable();

assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]);
});
assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]);
});
}

// wake up immmediately
block_on(async {
let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
.take(10)
.map(|s| s.map(identity))
.flatten_unordered(10)
.collect::<Vec<_>>()
.await;
// wake up immediately
{
block_on(async {
let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
.take(10)
.map(|s| s.map(identity))
.flatten_unordered(10)
.collect::<Vec<_>>()
.await;

fl_unordered.sort_unstable();
fl_unordered.sort_unstable();

assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
});
assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
});

block_on(async {
let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
.take(10)
.flat_map_unordered(10, |s| s.map(identity))
.collect::<Vec<_>>()
.await;
block_on(async {
let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
.take(10)
.flat_map_unordered(10, |s| s.map(identity))
.collect::<Vec<_>>()
.await;

fm_unordered.sort_unstable();
fm_unordered.sort_unstable();

assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});
assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});
}

// wake up after delay
block_on(async {
let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.map(|s| s.map(identity))
.flatten_unordered(10)
.collect::<Vec<_>>()
.await;

fl_unordered.sort_unstable();

assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
});

block_on(async {
let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.flat_map_unordered(10, |s| s.map(identity))
.collect::<Vec<_>>()
.await;
{
block_on(async {
let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.map(|s| s.map(identity))
.flatten_unordered(10)
.collect::<Vec<_>>()
.await;

fm_unordered.sort_unstable();
fl_unordered.sort_unstable();

assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});
assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
});

block_on(async {
let (mut fm_unordered, mut fl_unordered) = futures_util::join!(
Interchanger { polled: false, base: 0, wake_immediately: false }
block_on(async {
let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.flat_map_unordered(10, |s| s.map(identity))
.collect::<Vec<_>>(),
Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.map(|s| s.map(identity))
.flatten_unordered(10)
.collect::<Vec<_>>()
);
.await;

fm_unordered.sort_unstable();
fl_unordered.sort_unstable();
fm_unordered.sort_unstable();

assert_eq!(fm_unordered, fl_unordered);
assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});
assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});

block_on(async {
let (mut fm_unordered, mut fl_unordered) = futures_util::join!(
Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.flat_map_unordered(10, |s| s.map(identity))
.collect::<Vec<_>>(),
Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.map(|s| s.map(identity))
.flatten_unordered(10)
.collect::<Vec<_>>()
);

fm_unordered.sort_unstable();
fl_unordered.sort_unstable();

assert_eq!(fm_unordered, fl_unordered);
assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});
}

// waker panics
{
Expand Down

0 comments on commit 12bee98

Please sign in to comment.