Skip to content

Commit

Permalink
Implement early-exit stream selection with a trait
Browse files Browse the repository at this point in the history
  • Loading branch information
414owen committed Jun 1, 2022
1 parent b97d95e commit b593044
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 110 deletions.
2 changes: 1 addition & 1 deletion futures-util/src/stream/mod.rs
Expand Up @@ -104,7 +104,7 @@ pub use self::select::{select, select_early_exit, Select};

mod select_with_strategy;
pub use self::select_with_strategy::{
select_with_strategy, ExitStrategy, PollNext, SelectWithStrategy,
select_with_strategy, ClosedStreams, ExitStrategy, PollNext, SelectWithStrategy,
};

mod unfold;
Expand Down
73 changes: 49 additions & 24 deletions futures-util/src/stream/select.rs
@@ -1,20 +1,54 @@
use super::assert_stream;
use crate::stream::{select_with_strategy, ExitStrategy, PollNext, SelectWithStrategy};
use crate::stream::{
select_with_strategy, ClosedStreams, ExitStrategy, PollNext, SelectWithStrategy,
};
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;

type PollNextFn = fn(&mut PollNext) -> PollNext;

pin_project! {
/// Stream for the [`select()`] function.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Select<St1, St2> {
pub struct Select<St1, St2, Exit> {
#[pin]
inner: SelectWithStrategy<St1, St2, fn(&mut PollNext)-> PollNext, PollNext>,
inner: SelectWithStrategy<St1, St2, PollNextFn, PollNext, Exit>,
}
}

#[derive(Debug)]
pub struct ExitWhenBothFinished {}

impl ExitStrategy for ExitWhenBothFinished {
#[inline]
fn is_terminated(closed_streams: ClosedStreams) -> bool {
match closed_streams {
ClosedStreams::Both => true,
_ => false,
}
}
}

#[derive(Debug)]
pub struct ExitWhenEitherFinished {}

impl ExitStrategy for ExitWhenEitherFinished {
#[inline]
fn is_terminated(closed_streams: ClosedStreams) -> bool {
match closed_streams {
ClosedStreams::None => false,
_ => true,
}
}
}

fn round_robin(last: &mut PollNext) -> PollNext {
last.toggle()
}

/// This function will attempt to pull items from both streams. Each
/// stream will be polled in a round-robin fashion, and whenever a stream is
/// ready to yield an item that item is yielded.
Expand Down Expand Up @@ -44,42 +78,31 @@ pin_project! {
/// }
/// # });
/// ```
pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2>
pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2, ExitWhenBothFinished>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
select_with_exit(stream1, stream2, ExitStrategy::WhenBothFinish)
assert_stream::<St1::Item, _>(Select {
inner: select_with_strategy(stream1, stream2, round_robin),
})
}

/// Same as `select`, but finishes when either stream finishes
pub fn select_early_exit<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
select_with_exit(stream1, stream2, ExitStrategy::WhenEitherFinish)
}

fn select_with_exit<St1, St2>(
pub fn select_early_exit<St1, St2>(
stream1: St1,
stream2: St2,
exit_strategy: ExitStrategy,
) -> Select<St1, St2>
) -> Select<St1, St2, ExitWhenEitherFinished>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
fn round_robin(last: &mut PollNext) -> PollNext {
last.toggle()
}

assert_stream::<St1::Item, _>(Select {
inner: select_with_strategy(stream1, stream2, round_robin, exit_strategy),
inner: select_with_strategy(stream1, stream2, round_robin),
})
}

impl<St1, St2> Select<St1, St2> {
impl<St1, St2, Exit> Select<St1, St2, Exit> {
/// Acquires a reference to the underlying streams that this combinator is
/// pulling from.
pub fn get_ref(&self) -> (&St1, &St2) {
Expand Down Expand Up @@ -114,20 +137,22 @@ impl<St1, St2> Select<St1, St2> {
}
}

impl<St1, St2> FusedStream for Select<St1, St2>
impl<St1, St2, Exit> FusedStream for Select<St1, St2, Exit>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
Exit: ExitStrategy,
{
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}

impl<St1, St2> Stream for Select<St1, St2>
impl<St1, St2, Exit> Stream for Select<St1, St2, Exit>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
Exit: ExitStrategy,
{
type Item = St1::Item;

Expand Down

0 comments on commit b593044

Please sign in to comment.