Skip to content

Commit

Permalink
Misc tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn committed Jan 17, 2022
1 parent 952ccf7 commit bf4a826
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 93 deletions.
28 changes: 14 additions & 14 deletions futures-util/src/stream/stream/flatten_unordered.rs
Expand Up @@ -192,7 +192,7 @@ pin_project! {
/// Future which contains optional stream. If it's `Some`, it will attempt
/// to call `poll_next` on it, returning `Some((item, next_item_fut))` in
/// case of `Poll::Ready(Some(...))` or `None` in case of `Poll::Ready(None)`.
/// If `poll_next` will return `Poll::Pending`, it will be forwared to
/// If `poll_next` will return `Poll::Pending`, it will be forward to
/// the future, and current task will be notified by waker.
#[must_use = "futures do nothing unless you `.await` or poll them"]
struct PollStreamFut<St> {
Expand Down Expand Up @@ -229,9 +229,9 @@ pin_project! {
/// method.
#[project = FlattenUnorderedProj]
#[must_use = "streams do nothing unless polled"]
pub struct FlattenUnordered<St, U> {
pub struct FlattenUnordered<St> where St: Stream {
#[pin]
inner_streams: FuturesUnordered<PollStreamFut<U>>,
inner_streams: FuturesUnordered<PollStreamFut<St::Item>>,
#[pin]
stream: St,
poll_state: SharedPollState,
Expand All @@ -242,7 +242,7 @@ pin_project! {
}
}

impl<St> fmt::Debug for FlattenUnordered<St, St::Item>
impl<St> fmt::Debug for FlattenUnordered<St>
where
St: Stream + fmt::Debug,
St::Item: Stream + fmt::Debug,
Expand All @@ -258,12 +258,12 @@ where
}
}

impl<St> FlattenUnordered<St, St::Item>
impl<St> FlattenUnordered<St>
where
St: Stream,
St::Item: Stream + Unpin,
{
pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St, St::Item> {
pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St> {
let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM);

FlattenUnordered {
Expand All @@ -288,7 +288,7 @@ where
delegate_access_inner!(stream, St, ());
}

impl<St> FlattenUnorderedProj<'_, St, St::Item>
impl<St> FlattenUnorderedProj<'_, St>
where
St: Stream,
{
Expand All @@ -298,7 +298,7 @@ where
}
}

impl<St> FusedStream for FlattenUnordered<St, St::Item>
impl<St> FusedStream for FlattenUnordered<St>
where
St: FusedStream,
St::Item: FusedStream + Unpin,
Expand All @@ -309,7 +309,7 @@ where
}
}

impl<St> Stream for FlattenUnordered<St, St::Item>
impl<St> Stream for FlattenUnordered<St>
where
St: Stream,
St::Item: Stream + Unpin,
Expand All @@ -335,7 +335,7 @@ where
// Safety: now state is `POLLING`.
let stream_waker = unsafe { InnerWaker::replace_waker(this.stream_waker, cx) };

// Here we need to poll the inner stream.
// Here we need to poll the base stream.
//
// To improve performance, we will attempt to place as many items as we can
// to the `FuturesUnordered` bucket before polling inner streams
Expand Down Expand Up @@ -420,12 +420,12 @@ where

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for FlattenUnordered<S, S::Item>
impl<St, Item> Sink<Item> for FlattenUnordered<St>
where
S: Stream + Sink<Item>,
S::Item: Stream,
St: Stream + Sink<Item>,
St::Item: Stream,
{
type Error = S::Error;
type Error = St::Error;

delegate_sink!(stream, Item);
}
9 changes: 2 additions & 7 deletions futures-util/src/stream/stream/mod.rs
Expand Up @@ -205,13 +205,8 @@ mod flatten_unordered;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
delegate_all!(
/// Stream for the [`flatten_unordered`](StreamExt::flatten_unordered) method.
FlattenUnordered<St>(
flatten_unordered::FlattenUnordered<St, St::Item>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, limit: Option<usize>| flatten_unordered::FlattenUnordered::new(x, limit)]
where St: Stream, St::Item: Stream, St::Item: Unpin
);
#[allow(unreachable_pub)]
pub use self::flatten_unordered::FlattenUnordered;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
Expand Down
156 changes: 84 additions & 72 deletions futures/tests/stream.rs
Expand Up @@ -147,100 +147,112 @@ fn flatten_unordered() {
}

// basic behaviour
block_on(async {
let st =
stream::iter(vec![stream::iter(0..=4u8), stream::iter(6..=10), stream::iter(10..=12)]);
{
block_on(async {
let st = stream::iter(vec![
stream::iter(0..=4u8),
stream::iter(6..=10),
stream::iter(10..=12),
]);

let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await;
let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await;

assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]);
});
assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]);
});

block_on(async {
let st =
stream::iter(vec![stream::iter(0..=4u8), stream::iter(6..=10), stream::iter(0..=2)]);
block_on(async {
let st = stream::iter(vec![
stream::iter(0..=4u8),
stream::iter(6..=10),
stream::iter(0..=2),
]);

let mut fm_unordered = st
.flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0)))
.collect::<Vec<_>>()
.await;
let mut fm_unordered = st
.flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0)))
.collect::<Vec<_>>()
.await;

fm_unordered.sort_unstable();
fm_unordered.sort_unstable();

assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]);
});
assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]);
});
}

// wake up immmediately
block_on(async {
let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
.take(10)
.map(|s| s.map(identity))
.flatten_unordered(10)
.collect::<Vec<_>>()
.await;
// wake up immediately
{
block_on(async {
let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
.take(10)
.map(|s| s.map(identity))
.flatten_unordered(10)
.collect::<Vec<_>>()
.await;

fl_unordered.sort_unstable();
fl_unordered.sort_unstable();

assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
});
assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
});

block_on(async {
let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
.take(10)
.flat_map_unordered(10, |s| s.map(identity))
.collect::<Vec<_>>()
.await;
block_on(async {
let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
.take(10)
.flat_map_unordered(10, |s| s.map(identity))
.collect::<Vec<_>>()
.await;

fm_unordered.sort_unstable();
fm_unordered.sort_unstable();

assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});
assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});
}

// wake up after delay
block_on(async {
let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.map(|s| s.map(identity))
.flatten_unordered(10)
.collect::<Vec<_>>()
.await;

fl_unordered.sort_unstable();

assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
});

block_on(async {
let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.flat_map_unordered(10, |s| s.map(identity))
.collect::<Vec<_>>()
.await;
{
block_on(async {
let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.map(|s| s.map(identity))
.flatten_unordered(10)
.collect::<Vec<_>>()
.await;

fm_unordered.sort_unstable();
fl_unordered.sort_unstable();

assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});
assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
});

block_on(async {
let (mut fm_unordered, mut fl_unordered) = futures_util::join!(
Interchanger { polled: false, base: 0, wake_immediately: false }
block_on(async {
let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.flat_map_unordered(10, |s| s.map(identity))
.collect::<Vec<_>>(),
Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.map(|s| s.map(identity))
.flatten_unordered(10)
.collect::<Vec<_>>()
);
.await;

fm_unordered.sort_unstable();
fl_unordered.sort_unstable();
fm_unordered.sort_unstable();

assert_eq!(fm_unordered, fl_unordered);
assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});
assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});

block_on(async {
let (mut fm_unordered, mut fl_unordered) = futures_util::join!(
Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.flat_map_unordered(10, |s| s.map(identity))
.collect::<Vec<_>>(),
Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.map(|s| s.map(identity))
.flatten_unordered(10)
.collect::<Vec<_>>()
);

fm_unordered.sort_unstable();
fl_unordered.sort_unstable();

assert_eq!(fm_unordered, fl_unordered);
assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});
}

// waker panics
{
Expand Down

0 comments on commit bf4a826

Please sign in to comment.