Skip to content

Commit

Permalink
Merge branch 'master' into asyncfd
Browse files Browse the repository at this point in the history
  • Loading branch information
bdonlan committed Oct 5, 2020
2 parents 9dc0ace + 9730317 commit 03399d8
Show file tree
Hide file tree
Showing 12 changed files with 767 additions and 34 deletions.
4 changes: 3 additions & 1 deletion tokio-util/Cargo.toml
Expand Up @@ -25,10 +25,11 @@ publish = false
default = []

# Shorthand for enabling everything
full = ["codec", "compat", "io"]
full = ["codec", "compat", "io", "time"]

compat = ["futures-io",]
codec = ["tokio/stream"]
time = ["tokio/time","slab"]
io = []

[dependencies]
Expand All @@ -40,6 +41,7 @@ futures-sink = "0.3.0"
futures-io = { version = "0.3.0", optional = true }
log = "0.4"
pin-project-lite = "0.1.4"
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`

[dev-dependencies]
tokio = { version = "0.3.0", path = "../tokio", features = ["full"] }
Expand Down
3 changes: 3 additions & 0 deletions tokio-util/src/lib.rs
Expand Up @@ -53,6 +53,9 @@ pub mod sync;

pub mod either;

#[cfg(feature = "time")]
pub mod time;

#[cfg(any(feature = "io", feature = "codec"))]
mod util {
use tokio::io::{AsyncRead, ReadBuf};
Expand Down
Expand Up @@ -5,7 +5,9 @@
//! [`DelayQueue`]: struct@DelayQueue

use crate::time::wheel::{self, Wheel};
use crate::time::{sleep_until, Delay, Duration, Error, Instant};

use futures_core::ready;
use tokio::time::{sleep_until, Delay, Duration, Error, Instant};

use slab::Slab;
use std::cmp;
Expand Down Expand Up @@ -50,8 +52,8 @@ use std::task::{self, Poll};
///
/// # Implementation
///
/// The [`DelayQueue`] is backed by a separate instance of the same timer wheel used internally by
/// Tokio's standalone timer utilities such as [`sleep`]. Because of this, it offers the same
/// The [`DelayQueue`] is backed by a separate instance of a timer wheel similar to that used internally
/// by Tokio's standalone timer utilities such as [`sleep`]. Because of this, it offers the same
/// performance and scalability benefits.
///
/// State associated with each entry is stored in a [`slab`]. This amortizes the cost of allocation,
Expand All @@ -65,7 +67,8 @@ use std::task::{self, Poll};
/// Using `DelayQueue` to manage cache entries.
///
/// ```rust,no_run
/// use tokio::time::{delay_queue, DelayQueue, Error};
/// use tokio::time::Error;
/// use tokio_util::time::{DelayQueue, delay_queue};
///
/// use futures::ready;
/// use std::collections::HashMap;
Expand Down Expand Up @@ -118,7 +121,7 @@ use std::task::{self, Poll};
/// [`poll_expired`]: method@Self::poll_expired
/// [`Stream::poll_expired`]: method@Self::poll_expired
/// [`DelayQueue`]: struct@DelayQueue
/// [`sleep`]: fn@super::sleep
/// [`sleep`]: fn@tokio::time::sleep
/// [`slab`]: slab
/// [`capacity`]: method@Self::capacity
/// [`reserve`]: method@Self::reserve
Expand Down Expand Up @@ -210,7 +213,7 @@ impl<T> DelayQueue<T> {
/// # Examples
///
/// ```rust
/// # use tokio::time::DelayQueue;
/// # use tokio_util::time::DelayQueue;
/// let delay_queue: DelayQueue<u32> = DelayQueue::new();
/// ```
pub fn new() -> DelayQueue<T> {
Expand All @@ -226,7 +229,7 @@ impl<T> DelayQueue<T> {
/// # Examples
///
/// ```rust
/// # use tokio::time::DelayQueue;
/// # use tokio_util::time::DelayQueue;
/// # use std::time::Duration;
///
/// # #[tokio::main]
Expand Down Expand Up @@ -281,7 +284,8 @@ impl<T> DelayQueue<T> {
/// Basic usage
///
/// ```rust
/// use tokio::time::{DelayQueue, Duration, Instant};
/// use tokio::time::{Duration, Instant};
/// use tokio_util::time::DelayQueue;
///
/// # #[tokio::main]
/// # async fn main() {
Expand Down Expand Up @@ -391,7 +395,7 @@ impl<T> DelayQueue<T> {
/// Basic usage
///
/// ```rust
/// use tokio::time::DelayQueue;
/// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
Expand Down Expand Up @@ -460,7 +464,7 @@ impl<T> DelayQueue<T> {
/// Basic usage
///
/// ```rust
/// use tokio::time::DelayQueue;
/// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
Expand Down Expand Up @@ -503,7 +507,8 @@ impl<T> DelayQueue<T> {
/// Basic usage
///
/// ```rust
/// use tokio::time::{DelayQueue, Duration, Instant};
/// use tokio::time::{Duration, Instant};
/// use tokio_util::time::DelayQueue;
///
/// # #[tokio::main]
/// # async fn main() {
Expand Down Expand Up @@ -559,7 +564,7 @@ impl<T> DelayQueue<T> {
/// Basic usage
///
/// ```rust
/// use tokio::time::DelayQueue;
/// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
Expand Down Expand Up @@ -589,7 +594,7 @@ impl<T> DelayQueue<T> {
/// # Examples
///
/// ```rust
/// use tokio::time::DelayQueue;
/// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
Expand Down Expand Up @@ -617,7 +622,7 @@ impl<T> DelayQueue<T> {
/// # Examples
///
/// ```rust
/// use tokio::time::DelayQueue;
/// use tokio_util::time::DelayQueue;
///
/// let delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10);
/// assert_eq!(delay_queue.capacity(), 10);
Expand All @@ -631,7 +636,7 @@ impl<T> DelayQueue<T> {
/// # Examples
///
/// ```rust
/// use tokio::time::DelayQueue;
/// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
Expand Down Expand Up @@ -666,7 +671,7 @@ impl<T> DelayQueue<T> {
/// # Examples
///
/// ```
/// use tokio::time::DelayQueue;
/// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
Expand All @@ -691,7 +696,7 @@ impl<T> DelayQueue<T> {
/// # Examples
///
/// ```
/// use tokio::time::DelayQueue;
/// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
Expand Down
47 changes: 47 additions & 0 deletions tokio-util/src/time/mod.rs
@@ -0,0 +1,47 @@
//! Additional utilities for tracking time.
//!
//! This module provides additional utilities for executing code after a set period
//! of time. Currently there is only one:
//!
//! * `DelayQueue`: A queue where items are returned once the requested delay
//! has expired.
//!
//! This type must be used from within the context of the `Runtime`.

use std::time::Duration;

mod wheel;

#[doc(inline)]
pub mod delay_queue;

pub use delay_queue::DelayQueue;

// ===== Internal utils =====

enum Round {
Up,
Down,
}

/// Convert a `Duration` to milliseconds, rounding up and saturating at
/// `u64::MAX`.
///
/// The saturating is fine because `u64::MAX` milliseconds are still many
/// million years.
#[inline]
fn ms(duration: Duration, round: Round) -> u64 {
const NANOS_PER_MILLI: u32 = 1_000_000;
const MILLIS_PER_SEC: u64 = 1_000;

// Round up.
let millis = match round {
Round::Up => (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI,
Round::Down => duration.subsec_millis(),
};

duration
.as_secs()
.saturating_mul(MILLIS_PER_SEC)
.saturating_add(u64::from(millis))
}

0 comments on commit 03399d8

Please sign in to comment.