From b343e908973416e8cff69a3d110d57c06880d2e0 Mon Sep 17 00:00:00 2001 From: doyoubi Date: Sat, 2 Nov 2019 16:42:26 +0800 Subject: [PATCH] Replace tokio-timer with futures-timer for higher resolution --- Cargo.toml | 3 +-- src/lib.rs | 30 +++++++++++------------------- 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bdbe8bd..80358a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,8 +11,7 @@ repository = "https://github.com/mre/tokio-batch" futures-preview = { version = "0.3.0-alpha.19", features = ["compat"] } pin-utils = "0.1.0-alpha.4" tokio = "0.1.22" -tokio-timer = "0.2.11" -futures01 = { package = "futures", version = "0.1.29" } +futures-timer = "1.0.2" [dev-dependencies.doc-comment] version = "0.3" diff --git a/src/lib.rs b/src/lib.rs index dac521b..e7f2f13 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,16 +9,15 @@ doctest!("../README.md"); use core::mem; use core::pin::Pin; use futures::stream::{Fuse, FusedStream, Stream}; +use futures::Future; use futures::task::{Context, Poll}; use futures::StreamExt; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_utils::{unsafe_pinned, unsafe_unpinned}; -use futures01::Async; -use std::time::{Duration, Instant}; -use tokio::prelude::Future; -use tokio::timer::Delay; +use std::time::Duration; +use futures_timer::Delay; pub trait ChunksTimeoutStreamExt: Stream { fn chunks_timeout(self, capacity: usize, duration: Duration) -> ChunksTimeout @@ -48,7 +47,7 @@ where St: Stream, { unsafe_unpinned!(items: Vec); - unsafe_unpinned!(clock: Option); + unsafe_pinned!(clock: Option); unsafe_pinned!(stream: Fuse); pub fn new(stream: St, capacity: usize, duration: Duration) -> ChunksTimeout { @@ -114,7 +113,7 @@ impl Stream for ChunksTimeout { Some(item) => { if self.items.is_empty() { *self.as_mut().clock() = - Some(Delay::new(Instant::now() + self.duration)); + Some(Delay::new(self.duration)); } self.as_mut().items().push(item); if self.items.len() >= self.cap { @@ -143,25 +142,20 @@ impl Stream for ChunksTimeout { Poll::Pending => {} } - match self.as_mut().clock().poll() { - Ok(Async::Ready(Some(()))) => { + match self.as_mut().clock().as_pin_mut().map(|clock| clock.poll(cx)) { + Some(Poll::Ready(())) => { *self.as_mut().clock() = None; return Poll::Ready(Some(self.as_mut().take())); } - Ok(Async::Ready(None)) => { + Some(Poll::Pending) => {} + None => { debug_assert!( self.as_mut().items().is_empty(), "Inner buffer is empty, but clock is available." ); } - Ok(Async::NotReady) => {} - Err(_e) => { - if !self.as_mut().items().is_empty() { - *self.as_mut().clock() = None; - return Poll::Ready(Some(self.as_mut().take())); - } - } } + return Poll::Pending; } } @@ -198,7 +192,6 @@ where #[cfg(test)] mod tests { use super::*; - use futures::compat::Future01CompatExt; use futures::future; use futures::{stream, FutureExt, StreamExt, TryFutureExt}; use std::iter; @@ -266,8 +259,7 @@ mod tests { let iter = vec![5].into_iter(); let stream1 = stream::iter(iter).then(move |n| { - Delay::new(Instant::now() + Duration::from_millis(300)) - .compat() + Delay::new(Duration::from_millis(300)) .map(move |_| n) });