Skip to content

Commit

Permalink
Support light-weight passive timer for optimization on low timeout va…
Browse files Browse the repository at this point in the history
…lues
  • Loading branch information
doyoubi committed Nov 2, 2019
1 parent b343e90 commit fa763cd
Showing 1 changed file with 80 additions and 16 deletions.
96 changes: 80 additions & 16 deletions src/lib.rs
Expand Up @@ -16,11 +16,11 @@ use futures::StreamExt;
use futures_sink::Sink;
use pin_utils::{unsafe_pinned, unsafe_unpinned};

use std::time::Duration;
use std::time::{Duration, Instant};
use futures_timer::Delay;

pub trait ChunksTimeoutStreamExt: Stream {
fn chunks_timeout(self, capacity: usize, duration: Duration) -> ChunksTimeout<Self>
fn chunks_timeout(self, capacity: usize, duration: Duration) -> ChunksTimeout<Self, NoStats>
where
Self: Sized,
{
Expand All @@ -29,36 +29,81 @@ pub trait ChunksTimeoutStreamExt: Stream {
}
impl<T: ?Sized> ChunksTimeoutStreamExt for T where T: Stream {}

pub enum FlushEvent {
Full,
MinTimeoutTimer,
MaxTimeoutTimer,
}

pub trait StatsStrategy {
fn add(&mut self, event: FlushEvent);
}

pub struct NoStats;

impl StatsStrategy for NoStats {
fn add(&mut self, _event: FlushEvent) {}
}

#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct ChunksTimeout<St: Stream> {
pub struct ChunksTimeout<St: Stream, Stats: StatsStrategy> {
stream: Fuse<St>,
items: Vec<St::Item>,
cap: usize,
// https://github.com/rust-lang-nursery/futures-rs/issues/1475
clock: Option<Delay>,
duration: Duration,
min_duration: Option<Duration>,
max_duration: Duration,
last_flush_time: Instant,
stats: Stats,
}

impl<St: Unpin + Stream> Unpin for ChunksTimeout<St> {}
impl<St: Unpin + Stream, Stats: StatsStrategy> Unpin for ChunksTimeout<St, Stats> {}

impl<St: Stream> ChunksTimeout<St, NoStats>
where
St: Stream,
{
pub fn new(stream: St, capacity: usize, duration: Duration) -> ChunksTimeout<St, NoStats> {
Self::with_stats(stream, capacity, None, duration, NoStats{})
}

// Creates a ChunksTimeout with an additional passive timer with min_duration.
//
// Small timeout values, 20 microseconds e.g., could lead to heavy overhead
// from conditional variables to awake this future inside the runtime.
// To solve that, we add a light-weight passive timer with `min_duration` timeout
// which can only be driven by latter items inside the inner stream
// and use the real timer with `max_duration` timeout as the deadline timeout
// for the case that there's no upcoming item.
pub fn with_min_timeout(stream: St, capacity: usize, min_duration: Duration, max_duration: Duration) -> ChunksTimeout<St, NoStats> {
Self::with_stats(stream, capacity, Some(min_duration), max_duration, NoStats{})
}
}

impl<St: Stream> ChunksTimeout<St>
impl<St: Stream, Stats: StatsStrategy> ChunksTimeout<St, Stats>
where
St: Stream,
{
unsafe_unpinned!(items: Vec<St::Item>);
unsafe_pinned!(clock: Option<Delay>);
unsafe_pinned!(stream: Fuse<St>);
unsafe_unpinned!(last_flush_time: Instant);
unsafe_unpinned!(stats: Stats);

pub fn new(stream: St, capacity: usize, duration: Duration) -> ChunksTimeout<St> {
pub fn with_stats(stream: St, capacity: usize, min_duration: Option<Duration>, max_duration: Duration, stats: Stats) -> ChunksTimeout<St, Stats> {
assert!(capacity > 0);

ChunksTimeout {
stream: stream.fuse(),
items: Vec::with_capacity(capacity),
cap: capacity,
clock: None,
duration,
min_duration,
max_duration,
last_flush_time: Instant::now(),
stats,
}
}

Expand Down Expand Up @@ -98,9 +143,18 @@ where
pub fn into_inner(self) -> St {
self.stream.into_inner()
}

fn flush(mut self: Pin<&mut Self>, event: FlushEvent) -> Poll<Option<Vec<St::Item>>> {
*self.as_mut().clock() = None;
if self.min_duration.is_some() {
*self.as_mut().last_flush_time() = Instant::now();
}
self.as_mut().stats().add(event);
return Poll::Ready(Some(self.as_mut().take()));
}
}

impl<St: Stream> Stream for ChunksTimeout<St> {
impl<St: Stream, Stats: StatsStrategy> Stream for ChunksTimeout<St, Stats> {
type Item = Vec<St::Item>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -113,12 +167,11 @@ impl<St: Stream> Stream for ChunksTimeout<St> {
Some(item) => {
if self.items.is_empty() {
*self.as_mut().clock() =
Some(Delay::new(self.duration));
Some(Delay::new(self.max_duration));
}
self.as_mut().items().push(item);
if self.items.len() >= self.cap {
*self.as_mut().clock() = None;
return Poll::Ready(Some(self.as_mut().take()));
return self.flush(FlushEvent::Full)
} else {
// Continue the loop
continue;
Expand All @@ -142,10 +195,21 @@ impl<St: Stream> Stream for ChunksTimeout<St> {
Poll::Pending => {}
}

if self.items.is_empty() {
return Poll::Pending;
}

if let Some(min_duration) = self.min_duration {
let now = Instant::now();
if now > self.last_flush_time
&& now.duration_since(self.last_flush_time) >= min_duration {
return self.flush(FlushEvent::MinTimeoutTimer);
}
}

match self.as_mut().clock().as_pin_mut().map(|clock| clock.poll(cx)) {
Some(Poll::Ready(())) => {
*self.as_mut().clock() = None;
return Poll::Ready(Some(self.as_mut().take()));
return self.flush(FlushEvent::MaxTimeoutTimer);
}
Some(Poll::Pending) => {}
None => {
Expand All @@ -172,15 +236,15 @@ impl<St: Stream> Stream for ChunksTimeout<St> {
}
}

impl<St: FusedStream> FusedStream for ChunksTimeout<St> {
impl<St: FusedStream, Stats: StatsStrategy> FusedStream for ChunksTimeout<St, Stats> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated() & self.items.is_empty()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for ChunksTimeout<S>
impl<S, Stats: StatsStrategy, Item> Sink<Item> for ChunksTimeout<S, Stats>
where
S: Stream + Sink<Item>,
{
Expand Down

0 comments on commit fa763cd

Please sign in to comment.