Skip to content

Commit

Permalink
Return StreamFut instead of stream, small code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn committed Feb 17, 2020
1 parent 7dc39b0 commit 090d95d
Showing 1 changed file with 30 additions and 29 deletions.
59 changes: 30 additions & 29 deletions futures-util/src/stream/stream/flat_map_unordered.rs
Expand Up @@ -85,8 +85,8 @@ impl ArcWake for PollWaker {
}

/// Future which contains optional stream. If it's `Some`, it will attempt
/// to call `poll_next` on it, returning `Some((item, stream))` in case of
/// `Poll::Ready(Some(...))` or `None` in case of `Poll::Ready(None)`.
/// to call `poll_next` on it, returning `Some((item, next_item_fut))` in
/// case of `Poll::Ready(Some(...))` or `None` in case of `Poll::Ready(None)`.
/// If `poll_next` will return `Poll::Pending`, it will be forwared to
/// the future, and current task will be notified by waker.
#[must_use = "futures do nothing unless you `.await` or poll them"]
Expand All @@ -96,12 +96,19 @@ struct StreamFut<St> {

impl<St> StreamFut<St> {
unsafe_pinned!(stream: Option<St>);

/// Constructs new `StreamFut` using given `stream`.
fn new(stream: St) -> Self {
Self {
stream: stream.into(),
}
}
}

impl<St: Stream + Unpin> Unpin for StreamFut<St> {}

impl<St: Stream> Future for StreamFut<St> {
type Output = Option<(St::Item, St)>;
type Output = Option<(St::Item, StreamFut<St>)>;

fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let item = if let Some(stream) = self.as_mut().stream().as_pin_mut() {
Expand All @@ -111,9 +118,12 @@ impl<St: Stream> Future for StreamFut<St> {
};

Poll::Ready(item.map(|item| {
(item, unsafe {
self.get_unchecked_mut().stream.take().unwrap()
})
(
item,
StreamFut {
stream: unsafe { self.get_unchecked_mut().stream.take() },
},
)
}))
}
}
Expand Down Expand Up @@ -245,7 +255,7 @@ where
impl<St, U, F> FusedStream for FlatMapUnordered<St, U, F>
where
St: FusedStream,
U: Unpin + FusedStream,
U: FusedStream,
F: FnMut(St::Item) -> U,
{
fn is_terminated(&self) -> bool {
Expand All @@ -263,17 +273,15 @@ where

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut poll_state_value = self.as_mut().poll_state().begin_polling();

let mut next_item = None;
let mut need_to_poll_next = NONE;
let mut polling_with_two_wakers =
poll_state_value & NEED_TO_POLL == NEED_TO_POLL && self.not_exceeded_limit();
let mut polled_stream = false;
let mut polled_futures = false;
let mut stream_will_be_woken = false;
let mut futures_will_be_woken = false;

if poll_state_value & NEED_TO_POLL_STREAM != NONE {
if self.not_exceeded_limit() {
polled_stream = true;
match if polling_with_two_wakers {
let waker = self.create_poll_stream_waker(ctx);
let mut ctx = Context::from_waker(&waker);
Expand All @@ -282,9 +290,7 @@ where
self.as_mut().stream().poll_next(ctx)
} {
Poll::Ready(Some(inner_stream)) => {
self.as_mut().futures().push(StreamFut {
stream: Some(inner_stream),
});
self.as_mut().futures().push(StreamFut::new(inner_stream));
need_to_poll_next |= NEED_TO_POLL_STREAM;
// Polling futures in current iteration with the same context
// is ok because we already received `Poll::Ready` from
Expand All @@ -300,6 +306,7 @@ where
polling_with_two_wakers = false;
}
Poll::Pending => {
stream_will_be_woken = true;
if !polling_with_two_wakers {
need_to_poll_next |= NEED_TO_POLL_STREAM;
}
Expand All @@ -311,25 +318,23 @@ where
}

if poll_state_value & NEED_TO_POLL_FUTURES != NONE {
polled_futures = true;
match if polling_with_two_wakers {
let waker = self.create_poll_futures_waker(ctx);
let mut ctx = Context::from_waker(&waker);
self.as_mut().futures().poll_next(&mut ctx)
} else {
self.as_mut().futures().poll_next(ctx)
} {
Poll::Ready(Some(Some((item, stream)))) => {
self.as_mut().futures().push(StreamFut {
stream: Some(stream),
});
Poll::Ready(Some(Some((item, next_item_fut)))) => {
self.as_mut().futures().push(next_item_fut);
next_item = Some(item);
need_to_poll_next |= NEED_TO_POLL_FUTURES;
}
Poll::Ready(Some(None)) => {
need_to_poll_next |= NEED_TO_POLL_FUTURES;
}
Poll::Pending => {
futures_will_be_woken = true;
if !polling_with_two_wakers {
need_to_poll_next |= NEED_TO_POLL_FUTURES;
}
Expand All @@ -342,16 +347,12 @@ where

let poll_state_value = self.as_mut().poll_state().end_polling(need_to_poll_next);

if poll_state_value & NEED_TO_POLL != NONE {
if !polling_with_two_wakers {
if poll_state_value & NEED_TO_POLL_FUTURES != NONE && !polled_futures
|| poll_state_value & NEED_TO_POLL_STREAM != NONE && !polled_stream
{
ctx.waker().wake_by_ref();
}
} else {
ctx.waker().wake_by_ref();
}
if poll_state_value & NEED_TO_POLL != NONE
&& (polling_with_two_wakers
|| (poll_state_value & NEED_TO_POLL_FUTURES != NONE && !futures_will_be_woken
|| poll_state_value & NEED_TO_POLL_STREAM != NONE && !stream_will_be_woken))
{
ctx.waker().wake_by_ref();
}

if self.futures.is_empty() && self.is_stream_done || next_item.is_some() {
Expand Down

0 comments on commit 090d95d

Please sign in to comment.