Skip to content

Commit

Permalink
Simplify implementation based on review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
phil-opp committed Aug 31, 2020
1 parent e5e005a commit fa749d2
Showing 1 changed file with 3 additions and 14 deletions.
17 changes: 3 additions & 14 deletions futures-util/src/stream/futures_ordered.rs
Expand Up @@ -95,7 +95,6 @@ pub struct FuturesOrdered<T: Future> {
queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
next_incoming_index: usize,
next_outgoing_index: usize,
is_terminated: bool,
}

impl<T: Future> Unpin for FuturesOrdered<T> {}
Expand All @@ -111,7 +110,6 @@ impl<Fut: Future> FuturesOrdered<Fut> {
queued_outputs: BinaryHeap::new(),
next_incoming_index: 0,
next_outgoing_index: 0,
is_terminated: false,
}
}

Expand Down Expand Up @@ -142,10 +140,6 @@ impl<Fut: Future> FuturesOrdered<Fut> {
};
self.next_incoming_index += 1;
self.in_progress_queue.push(wrapped);

// Reset the `is_terminated` flag if we've previously marked ourselves
// as terminated.
self.is_terminated = false;
}
}

Expand Down Expand Up @@ -182,12 +176,7 @@ impl<Fut: Future> Stream for FuturesOrdered<Fut> {
this.queued_outputs.push(output)
}
}
None => {
// We can only consider ourselves terminated once we
// have yielded a `None`
this.is_terminated = true;
return Poll::Ready(None);
},
None => return Poll::Ready(None),
}
}
}
Expand All @@ -214,9 +203,9 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
}
}

impl<Fut: Future> FusedStream for FuturesOrdered<Fut> {
impl<Fut: Future> FusedStream for FuturesUnordered<Fut> {
fn is_terminated(&self) -> bool {
self.is_terminated
self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty()
}
}

Expand Down

0 comments on commit fa749d2

Please sign in to comment.