diff --git a/src/lib.rs b/src/lib.rs index e7f2f13..308c1b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,11 +16,11 @@ use futures::StreamExt; use futures_sink::Sink; use pin_utils::{unsafe_pinned, unsafe_unpinned}; -use std::time::Duration; +use std::time::{Duration, Instant}; use futures_timer::Delay; pub trait ChunksTimeoutStreamExt: Stream { - fn chunks_timeout(self, capacity: usize, duration: Duration) -> ChunksTimeout + fn chunks_timeout(self, capacity: usize, duration: Duration) -> ChunksTimeout where Self: Sized, { @@ -29,28 +29,70 @@ pub trait ChunksTimeoutStreamExt: Stream { } impl ChunksTimeoutStreamExt for T where T: Stream {} +pub enum FlushEvent { + Full, + MinTimeoutTimer, + MaxTimeoutTimer, +} + +pub trait StatsStrategy { + fn add(&mut self, event: FlushEvent); +} + +pub struct NoStats; + +impl StatsStrategy for NoStats { + fn add(&mut self, _event: FlushEvent) {} +} + #[derive(Debug)] #[must_use = "streams do nothing unless polled"] -pub struct ChunksTimeout { +pub struct ChunksTimeout { stream: Fuse, items: Vec, cap: usize, // https://github.com/rust-lang-nursery/futures-rs/issues/1475 clock: Option, - duration: Duration, + min_duration: Option, + max_duration: Duration, + last_flush_time: Instant, + stats: Stats, } -impl Unpin for ChunksTimeout {} +impl Unpin for ChunksTimeout {} + +impl ChunksTimeout + where + St: Stream, +{ + pub fn new(stream: St, capacity: usize, duration: Duration) -> ChunksTimeout { + Self::with_stats(stream, capacity, None, duration, NoStats{}) + } + + // Creates a ChunksTimeout with an additional passive timer with min_duration. + // + // Small timeout values, 20 microseconds e.g., could lead to heavy overhead + // from conditional variables to awake this future inside the runtime. + // To solve that, we add a light-weight passive timer with `min_duration` timeout + // which can only be driven by latter items inside the inner stream + // and use the real timer with `max_duration` timeout as the deadline timeout + // for the case that there's no upcoming item. + pub fn with_min_timeout(stream: St, capacity: usize, min_duration: Duration, max_duration: Duration) -> ChunksTimeout { + Self::with_stats(stream, capacity, Some(min_duration), max_duration, NoStats{}) + } +} -impl ChunksTimeout +impl ChunksTimeout where St: Stream, { unsafe_unpinned!(items: Vec); unsafe_pinned!(clock: Option); unsafe_pinned!(stream: Fuse); + unsafe_unpinned!(last_flush_time: Instant); + unsafe_unpinned!(stats: Stats); - pub fn new(stream: St, capacity: usize, duration: Duration) -> ChunksTimeout { + pub fn with_stats(stream: St, capacity: usize, min_duration: Option, max_duration: Duration, stats: Stats) -> ChunksTimeout { assert!(capacity > 0); ChunksTimeout { @@ -58,7 +100,10 @@ where items: Vec::with_capacity(capacity), cap: capacity, clock: None, - duration, + min_duration, + max_duration, + last_flush_time: Instant::now(), + stats, } } @@ -98,9 +143,18 @@ where pub fn into_inner(self) -> St { self.stream.into_inner() } + + fn flush(mut self: Pin<&mut Self>, event: FlushEvent) -> Poll>> { + *self.as_mut().clock() = None; + if self.min_duration.is_some() { + *self.as_mut().last_flush_time() = Instant::now(); + } + self.as_mut().stats().add(event); + return Poll::Ready(Some(self.as_mut().take())); + } } -impl Stream for ChunksTimeout { +impl Stream for ChunksTimeout { type Item = Vec; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -113,12 +167,11 @@ impl Stream for ChunksTimeout { Some(item) => { if self.items.is_empty() { *self.as_mut().clock() = - Some(Delay::new(self.duration)); + Some(Delay::new(self.max_duration)); } self.as_mut().items().push(item); if self.items.len() >= self.cap { - *self.as_mut().clock() = None; - return Poll::Ready(Some(self.as_mut().take())); + return self.flush(FlushEvent::Full) } else { // Continue the loop continue; @@ -142,10 +195,21 @@ impl Stream for ChunksTimeout { Poll::Pending => {} } + if self.items.is_empty() { + return Poll::Pending; + } + + if let Some(min_duration) = self.min_duration { + let now = Instant::now(); + if now > self.last_flush_time + && now.duration_since(self.last_flush_time) >= min_duration { + return self.flush(FlushEvent::MinTimeoutTimer); + } + } + match self.as_mut().clock().as_pin_mut().map(|clock| clock.poll(cx)) { Some(Poll::Ready(())) => { - *self.as_mut().clock() = None; - return Poll::Ready(Some(self.as_mut().take())); + return self.flush(FlushEvent::MaxTimeoutTimer); } Some(Poll::Pending) => {} None => { @@ -172,7 +236,7 @@ impl Stream for ChunksTimeout { } } -impl FusedStream for ChunksTimeout { +impl FusedStream for ChunksTimeout { fn is_terminated(&self) -> bool { self.stream.is_terminated() & self.items.is_empty() } @@ -180,7 +244,7 @@ impl FusedStream for ChunksTimeout { // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl Sink for ChunksTimeout +impl Sink for ChunksTimeout where S: Stream + Sink, {