Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds unbonded select #1881

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ mod poll_fn;
pub use self::poll_fn::{poll_fn, PollFn};

mod select;
pub use self::select::{select, Select};
pub use self::select::{select, select_unbonded, Select};

mod unfold;
pub use self::unfold::{unfold, Unfold};
Expand Down
33 changes: 27 additions & 6 deletions futures-util/src/stream/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct Select<St1, St2> {
stream1: Fuse<St1>,
stream2: Fuse<St2>,
flag: bool,
bonded: bool,
}

impl<St1: Unpin, St2: Unpin> Unpin for Select<St1, St2> {}
Expand All @@ -32,9 +33,22 @@ pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2>
stream1: stream1.fuse(),
stream2: stream2.fuse(),
flag: false,
bonded: true,
}
}

/// Similar to [`select()`] with a distinct difference.
/// If either of the input streams is closed, the returned stream is also closed.
pub fn select_unbonded<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2>
where St1: Stream,
St2: Stream<Item = St1::Item>
{
let mut select = select(stream1, stream2);
select.bonded = false;
select
}


impl<St1, St2> Select<St1, St2> {
/// Acquires a reference to the underlying streams that this combinator is
/// pulling from.
Expand Down Expand Up @@ -77,7 +91,11 @@ impl<St1, St2> FusedStream for Select<St1, St2>
St2: Stream<Item = St1::Item>
{
fn is_terminated(&self) -> bool {
self.stream1.is_terminated() && self.stream2.is_terminated()
if self.bonded {
self.stream1.is_terminated() && self.stream2.is_terminated()
} else {
self.stream1.is_terminated() || self.stream2.is_terminated()
}
}
}

Expand All @@ -91,21 +109,22 @@ impl<St1, St2> Stream for Select<St1, St2>
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<St1::Item>> {
let Select { flag, stream1, stream2 } =
let Select { flag, stream1, stream2, bonded } =
unsafe { self.get_unchecked_mut() };
let stream1 = unsafe { Pin::new_unchecked(stream1) };
let stream2 = unsafe { Pin::new_unchecked(stream2) };

if !*flag {
poll_inner(flag, stream1, stream2, cx)
poll_inner(flag, *bonded, stream1, stream2, cx)
} else {
poll_inner(flag, stream2, stream1, cx)
poll_inner(flag, *bonded, stream2, stream1, cx)
}
}
}

fn poll_inner<St1, St2>(
flag: &mut bool,
bonded: bool,
a: Pin<&mut St1>,
b: Pin<&mut St2>,
cx: &mut Context<'_>
Expand All @@ -118,15 +137,17 @@ fn poll_inner<St1, St2>(
*flag = !*flag;
return Poll::Ready(Some(item))
},
Poll::Ready(None) => true,
Poll::Ready(None) if bonded => true,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => false,
};

match b.poll_next(cx) {
Poll::Ready(Some(item)) => {
Poll::Ready(Some(item))
}
Poll::Ready(None) if a_done => Poll::Ready(None),
Poll::Ready(None) if a_done && bonded => Poll::Ready(None),
Poll::Ready(None) if !bonded => Poll::Ready(None),
Poll::Ready(None) | Poll::Pending => Poll::Pending,
}
}
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ pub mod stream {
pending, Pending,
once, Once,
poll_fn, PollFn,
select, Select,
select, select_unbonded, Select,
unfold, Unfold,
try_unfold, TryUnfold,

Expand Down