diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 45daa2b13cc..8c54f27b503 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -25,10 +25,11 @@ publish = false default = [] # Shorthand for enabling everything -full = ["codec", "compat", "io"] +full = ["codec", "compat", "io", "time"] compat = ["futures-io",] codec = ["tokio/stream"] +time = ["tokio/time","slab"] io = [] [dependencies] @@ -40,6 +41,7 @@ futures-sink = "0.3.0" futures-io = { version = "0.3.0", optional = true } log = "0.4" pin-project-lite = "0.1.4" +slab = { version = "0.4.1", optional = true } # Backs `DelayQueue` [dev-dependencies] tokio = { version = "0.3.0", path = "../tokio", features = ["full"] } diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index eb35345e796..31a16d05229 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -53,6 +53,9 @@ pub mod sync; pub mod either; +#[cfg(feature = "time")] +pub mod time; + #[cfg(any(feature = "io", feature = "codec"))] mod util { use tokio::io::{AsyncRead, ReadBuf}; diff --git a/tokio/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs similarity index 96% rename from tokio/src/time/delay_queue.rs rename to tokio-util/src/time/delay_queue.rs index 910f75fb320..b23c24e6149 100644 --- a/tokio/src/time/delay_queue.rs +++ b/tokio-util/src/time/delay_queue.rs @@ -5,7 +5,9 @@ //! [`DelayQueue`]: struct@DelayQueue use crate::time::wheel::{self, Wheel}; -use crate::time::{sleep_until, Delay, Duration, Error, Instant}; + +use futures_core::ready; +use tokio::time::{sleep_until, Delay, Duration, Error, Instant}; use slab::Slab; use std::cmp; @@ -50,8 +52,8 @@ use std::task::{self, Poll}; /// /// # Implementation /// -/// The [`DelayQueue`] is backed by a separate instance of the same timer wheel used internally by -/// Tokio's standalone timer utilities such as [`sleep`]. Because of this, it offers the same +/// The [`DelayQueue`] is backed by a separate instance of a timer wheel similar to that used internally +/// by Tokio's standalone timer utilities such as [`sleep`]. Because of this, it offers the same /// performance and scalability benefits. /// /// State associated with each entry is stored in a [`slab`]. This amortizes the cost of allocation, @@ -65,7 +67,8 @@ use std::task::{self, Poll}; /// Using `DelayQueue` to manage cache entries. /// /// ```rust,no_run -/// use tokio::time::{delay_queue, DelayQueue, Error}; +/// use tokio::time::Error; +/// use tokio_util::time::{DelayQueue, delay_queue}; /// /// use futures::ready; /// use std::collections::HashMap; @@ -118,7 +121,7 @@ use std::task::{self, Poll}; /// [`poll_expired`]: method@Self::poll_expired /// [`Stream::poll_expired`]: method@Self::poll_expired /// [`DelayQueue`]: struct@DelayQueue -/// [`sleep`]: fn@super::sleep +/// [`sleep`]: fn@tokio::time::sleep /// [`slab`]: slab /// [`capacity`]: method@Self::capacity /// [`reserve`]: method@Self::reserve @@ -210,7 +213,7 @@ impl DelayQueue { /// # Examples /// /// ```rust - /// # use tokio::time::DelayQueue; + /// # use tokio_util::time::DelayQueue; /// let delay_queue: DelayQueue = DelayQueue::new(); /// ``` pub fn new() -> DelayQueue { @@ -226,7 +229,7 @@ impl DelayQueue { /// # Examples /// /// ```rust - /// # use tokio::time::DelayQueue; + /// # use tokio_util::time::DelayQueue; /// # use std::time::Duration; /// /// # #[tokio::main] @@ -281,7 +284,8 @@ impl DelayQueue { /// Basic usage /// /// ```rust - /// use tokio::time::{DelayQueue, Duration, Instant}; + /// use tokio::time::{Duration, Instant}; + /// use tokio_util::time::DelayQueue; /// /// # #[tokio::main] /// # async fn main() { @@ -391,7 +395,7 @@ impl DelayQueue { /// Basic usage /// /// ```rust - /// use tokio::time::DelayQueue; + /// use tokio_util::time::DelayQueue; /// use std::time::Duration; /// /// # #[tokio::main] @@ -460,7 +464,7 @@ impl DelayQueue { /// Basic usage /// /// ```rust - /// use tokio::time::DelayQueue; + /// use tokio_util::time::DelayQueue; /// use std::time::Duration; /// /// # #[tokio::main] @@ -503,7 +507,8 @@ impl DelayQueue { /// Basic usage /// /// ```rust - /// use tokio::time::{DelayQueue, Duration, Instant}; + /// use tokio::time::{Duration, Instant}; + /// use tokio_util::time::DelayQueue; /// /// # #[tokio::main] /// # async fn main() { @@ -559,7 +564,7 @@ impl DelayQueue { /// Basic usage /// /// ```rust - /// use tokio::time::DelayQueue; + /// use tokio_util::time::DelayQueue; /// use std::time::Duration; /// /// # #[tokio::main] @@ -589,7 +594,7 @@ impl DelayQueue { /// # Examples /// /// ```rust - /// use tokio::time::DelayQueue; + /// use tokio_util::time::DelayQueue; /// use std::time::Duration; /// /// # #[tokio::main] @@ -617,7 +622,7 @@ impl DelayQueue { /// # Examples /// /// ```rust - /// use tokio::time::DelayQueue; + /// use tokio_util::time::DelayQueue; /// /// let delay_queue: DelayQueue = DelayQueue::with_capacity(10); /// assert_eq!(delay_queue.capacity(), 10); @@ -631,7 +636,7 @@ impl DelayQueue { /// # Examples /// /// ```rust - /// use tokio::time::DelayQueue; + /// use tokio_util::time::DelayQueue; /// use std::time::Duration; /// /// # #[tokio::main] @@ -666,7 +671,7 @@ impl DelayQueue { /// # Examples /// /// ``` - /// use tokio::time::DelayQueue; + /// use tokio_util::time::DelayQueue; /// use std::time::Duration; /// /// # #[tokio::main] @@ -691,7 +696,7 @@ impl DelayQueue { /// # Examples /// /// ``` - /// use tokio::time::DelayQueue; + /// use tokio_util::time::DelayQueue; /// use std::time::Duration; /// /// # #[tokio::main] diff --git a/tokio-util/src/time/mod.rs b/tokio-util/src/time/mod.rs new file mode 100644 index 00000000000..c6c8799d99e --- /dev/null +++ b/tokio-util/src/time/mod.rs @@ -0,0 +1,47 @@ +//! Additional utilities for tracking time. +//! +//! This module provides additional utilities for executing code after a set period +//! of time. Currently there is only one: +//! +//! * `DelayQueue`: A queue where items are returned once the requested delay +//! has expired. +//! +//! This type must be used from within the context of the `Runtime`. + +use std::time::Duration; + +mod wheel; + +#[doc(inline)] +pub mod delay_queue; + +pub use delay_queue::DelayQueue; + +// ===== Internal utils ===== + +enum Round { + Up, + Down, +} + +/// Convert a `Duration` to milliseconds, rounding up and saturating at +/// `u64::MAX`. +/// +/// The saturating is fine because `u64::MAX` milliseconds are still many +/// million years. +#[inline] +fn ms(duration: Duration, round: Round) -> u64 { + const NANOS_PER_MILLI: u32 = 1_000_000; + const MILLIS_PER_SEC: u64 = 1_000; + + // Round up. + let millis = match round { + Round::Up => (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI, + Round::Down => duration.subsec_millis(), + }; + + duration + .as_secs() + .saturating_mul(MILLIS_PER_SEC) + .saturating_add(u64::from(millis)) +} diff --git a/tokio-util/src/time/wheel/level.rs b/tokio-util/src/time/wheel/level.rs new file mode 100644 index 00000000000..49f9bfb9cf0 --- /dev/null +++ b/tokio-util/src/time/wheel/level.rs @@ -0,0 +1,255 @@ +use crate::time::wheel::Stack; + +use std::fmt; + +/// Wheel for a single level in the timer. This wheel contains 64 slots. +pub(crate) struct Level { + level: usize, + + /// Bit field tracking which slots currently contain entries. + /// + /// Using a bit field to track slots that contain entries allows avoiding a + /// scan to find entries. This field is updated when entries are added or + /// removed from a slot. + /// + /// The least-significant bit represents slot zero. + occupied: u64, + + /// Slots + slot: [T; LEVEL_MULT], +} + +/// Indicates when a slot must be processed next. +#[derive(Debug)] +pub(crate) struct Expiration { + /// The level containing the slot. + pub(crate) level: usize, + + /// The slot index. + pub(crate) slot: usize, + + /// The instant at which the slot needs to be processed. + pub(crate) deadline: u64, +} + +/// Level multiplier. +/// +/// Being a power of 2 is very important. +const LEVEL_MULT: usize = 64; + +impl Level { + pub(crate) fn new(level: usize) -> Level { + // Rust's derived implementations for arrays require that the value + // contained by the array be `Copy`. So, here we have to manually + // initialize every single slot. + macro_rules! s { + () => { + T::default() + }; + }; + + Level { + level, + occupied: 0, + slot: [ + // It does not look like the necessary traits are + // derived for [T; 64]. + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + s!(), + ], + } + } + + /// Finds the slot that needs to be processed next and returns the slot and + /// `Instant` at which this slot must be processed. + pub(crate) fn next_expiration(&self, now: u64) -> Option { + // Use the `occupied` bit field to get the index of the next slot that + // needs to be processed. + let slot = match self.next_occupied_slot(now) { + Some(slot) => slot, + None => return None, + }; + + // From the slot index, calculate the `Instant` at which it needs to be + // processed. This value *must* be in the future with respect to `now`. + + let level_range = level_range(self.level); + let slot_range = slot_range(self.level); + + // TODO: This can probably be simplified w/ power of 2 math + let level_start = now - (now % level_range); + let deadline = level_start + slot as u64 * slot_range; + + debug_assert!( + deadline >= now, + "deadline={}; now={}; level={}; slot={}; occupied={:b}", + deadline, + now, + self.level, + slot, + self.occupied + ); + + Some(Expiration { + level: self.level, + slot, + deadline, + }) + } + + fn next_occupied_slot(&self, now: u64) -> Option { + if self.occupied == 0 { + return None; + } + + // Get the slot for now using Maths + let now_slot = (now / slot_range(self.level)) as usize; + let occupied = self.occupied.rotate_right(now_slot as u32); + let zeros = occupied.trailing_zeros() as usize; + let slot = (zeros + now_slot) % 64; + + Some(slot) + } + + pub(crate) fn add_entry(&mut self, when: u64, item: T::Owned, store: &mut T::Store) { + let slot = slot_for(when, self.level); + + self.slot[slot].push(item, store); + self.occupied |= occupied_bit(slot); + } + + pub(crate) fn remove_entry(&mut self, when: u64, item: &T::Borrowed, store: &mut T::Store) { + let slot = slot_for(when, self.level); + + self.slot[slot].remove(item, store); + + if self.slot[slot].is_empty() { + // The bit is currently set + debug_assert!(self.occupied & occupied_bit(slot) != 0); + + // Unset the bit + self.occupied ^= occupied_bit(slot); + } + } + + pub(crate) fn pop_entry_slot(&mut self, slot: usize, store: &mut T::Store) -> Option { + let ret = self.slot[slot].pop(store); + + if ret.is_some() && self.slot[slot].is_empty() { + // The bit is currently set + debug_assert!(self.occupied & occupied_bit(slot) != 0); + + self.occupied ^= occupied_bit(slot); + } + + ret + } +} + +impl fmt::Debug for Level { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Level") + .field("occupied", &self.occupied) + .finish() + } +} + +fn occupied_bit(slot: usize) -> u64 { + 1 << slot +} + +fn slot_range(level: usize) -> u64 { + LEVEL_MULT.pow(level as u32) as u64 +} + +fn level_range(level: usize) -> u64 { + LEVEL_MULT as u64 * slot_range(level) +} + +/// Convert a duration (milliseconds) and a level to a slot position +fn slot_for(duration: u64, level: usize) -> usize { + ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize +} + +/* +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + + #[test] + fn test_slot_for() { + for pos in 1..64 { + assert_eq!(pos as usize, slot_for(pos, 0)); + } + + for level in 1..5 { + for pos in level..64 { + let a = pos * 64_usize.pow(level as u32); + assert_eq!(pos as usize, slot_for(a as u64, level)); + } + } + } +} +*/ diff --git a/tokio-util/src/time/wheel/mod.rs b/tokio-util/src/time/wheel/mod.rs new file mode 100644 index 00000000000..a2ef27fc6c5 --- /dev/null +++ b/tokio-util/src/time/wheel/mod.rs @@ -0,0 +1,314 @@ +mod level; +pub(crate) use self::level::Expiration; +use self::level::Level; + +mod stack; +pub(crate) use self::stack::Stack; + +use std::borrow::Borrow; +use std::usize; + +/// Timing wheel implementation. +/// +/// This type provides the hashed timing wheel implementation that backs `Timer` +/// and `DelayQueue`. +/// +/// The structure is generic over `T: Stack`. This allows handling timeout data +/// being stored on the heap or in a slab. In order to support the latter case, +/// the slab must be passed into each function allowing the implementation to +/// lookup timer entries. +/// +/// See `Timer` documentation for some implementation notes. +#[derive(Debug)] +pub(crate) struct Wheel { + /// The number of milliseconds elapsed since the wheel started. + elapsed: u64, + + /// Timer wheel. + /// + /// Levels: + /// + /// * 1 ms slots / 64 ms range + /// * 64 ms slots / ~ 4 sec range + /// * ~ 4 sec slots / ~ 4 min range + /// * ~ 4 min slots / ~ 4 hr range + /// * ~ 4 hr slots / ~ 12 day range + /// * ~ 12 day slots / ~ 2 yr range + levels: Vec>, +} + +/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots +/// each, the timer is able to track time up to 2 years into the future with a +/// precision of 1 millisecond. +const NUM_LEVELS: usize = 6; + +/// The maximum duration of a delay +const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; + +#[derive(Debug)] +pub(crate) enum InsertError { + Elapsed, + Invalid, +} + +/// Poll expirations from the wheel +#[derive(Debug, Default)] +pub(crate) struct Poll { + now: u64, + expiration: Option, +} + +impl Wheel +where + T: Stack, +{ + /// Create a new timing wheel + pub(crate) fn new() -> Wheel { + let levels = (0..NUM_LEVELS).map(Level::new).collect(); + + Wheel { elapsed: 0, levels } + } + + /// Return the number of milliseconds that have elapsed since the timing + /// wheel's creation. + pub(crate) fn elapsed(&self) -> u64 { + self.elapsed + } + + /// Insert an entry into the timing wheel. + /// + /// # Arguments + /// + /// * `when`: is the instant at which the entry should be fired. It is + /// represented as the number of milliseconds since the creation + /// of the timing wheel. + /// + /// * `item`: The item to insert into the wheel. + /// + /// * `store`: The slab or `()` when using heap storage. + /// + /// # Return + /// + /// Returns `Ok` when the item is successfully inserted, `Err` otherwise. + /// + /// `Err(Elapsed)` indicates that `when` represents an instant that has + /// already passed. In this case, the caller should fire the timeout + /// immediately. + /// + /// `Err(Invalid)` indicates an invalid `when` argument as been supplied. + pub(crate) fn insert( + &mut self, + when: u64, + item: T::Owned, + store: &mut T::Store, + ) -> Result<(), (T::Owned, InsertError)> { + if when <= self.elapsed { + return Err((item, InsertError::Elapsed)); + } else if when - self.elapsed > MAX_DURATION { + return Err((item, InsertError::Invalid)); + } + + // Get the level at which the entry should be stored + let level = self.level_for(when); + + self.levels[level].add_entry(when, item, store); + + debug_assert!({ + self.levels[level] + .next_expiration(self.elapsed) + .map(|e| e.deadline >= self.elapsed) + .unwrap_or(true) + }); + + Ok(()) + } + + /// Remove `item` from thee timing wheel. + pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) { + let when = T::when(item, store); + let level = self.level_for(when); + + self.levels[level].remove_entry(when, item, store); + } + + /// Instant at which to poll + pub(crate) fn poll_at(&self) -> Option { + self.next_expiration().map(|expiration| expiration.deadline) + } + + pub(crate) fn poll(&mut self, poll: &mut Poll, store: &mut T::Store) -> Option { + loop { + if poll.expiration.is_none() { + poll.expiration = self.next_expiration().and_then(|expiration| { + if expiration.deadline > poll.now { + None + } else { + Some(expiration) + } + }); + } + + match poll.expiration { + Some(ref expiration) => { + if let Some(item) = self.poll_expiration(expiration, store) { + return Some(item); + } + + self.set_elapsed(expiration.deadline); + } + None => { + self.set_elapsed(poll.now); + return None; + } + } + + poll.expiration = None; + } + } + + /// Returns the instant at which the next timeout expires. + fn next_expiration(&self) -> Option { + // Check all levels + for level in 0..NUM_LEVELS { + if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) { + // There cannot be any expirations at a higher level that happen + // before this one. + debug_assert!(self.no_expirations_before(level + 1, expiration.deadline)); + + return Some(expiration); + } + } + + None + } + + /// Used for debug assertions + fn no_expirations_before(&self, start_level: usize, before: u64) -> bool { + let mut res = true; + + for l2 in start_level..NUM_LEVELS { + if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) { + if e2.deadline < before { + res = false; + } + } + } + + res + } + + pub(crate) fn poll_expiration( + &mut self, + expiration: &Expiration, + store: &mut T::Store, + ) -> Option { + while let Some(item) = self.pop_entry(expiration, store) { + if expiration.level == 0 { + debug_assert_eq!(T::when(item.borrow(), store), expiration.deadline); + + return Some(item); + } else { + let when = T::when(item.borrow(), store); + + let next_level = expiration.level - 1; + + self.levels[next_level].add_entry(when, item, store); + } + } + + None + } + + fn set_elapsed(&mut self, when: u64) { + assert!( + self.elapsed <= when, + "elapsed={:?}; when={:?}", + self.elapsed, + when + ); + + if when > self.elapsed { + self.elapsed = when; + } + } + + fn pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option { + self.levels[expiration.level].pop_entry_slot(expiration.slot, store) + } + + fn level_for(&self, when: u64) -> usize { + level_for(self.elapsed, when) + } +} + +fn level_for(elapsed: u64, when: u64) -> usize { + let masked = elapsed ^ when; + + assert!(masked != 0, "elapsed={}; when={}", elapsed, when); + + let leading_zeros = masked.leading_zeros() as usize; + let significant = 63 - leading_zeros; + significant / 6 +} + +impl Poll { + pub(crate) fn new(now: u64) -> Poll { + Poll { + now, + expiration: None, + } + } +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + + #[test] + fn test_level_for() { + for pos in 1..64 { + assert_eq!( + 0, + level_for(0, pos), + "level_for({}) -- binary = {:b}", + pos, + pos + ); + } + + for level in 1..5 { + for pos in level..64 { + let a = pos * 64_usize.pow(level as u32); + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + + if pos > level { + let a = a - 1; + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + } + + if pos < 64 { + let a = a + 1; + assert_eq!( + level, + level_for(0, a as u64), + "level_for({}) -- binary = {:b}", + a, + a + ); + } + } + } + } +} diff --git a/tokio-util/src/time/wheel/stack.rs b/tokio-util/src/time/wheel/stack.rs new file mode 100644 index 00000000000..6e55c38ccda --- /dev/null +++ b/tokio-util/src/time/wheel/stack.rs @@ -0,0 +1,26 @@ +use std::borrow::Borrow; + +/// Abstracts the stack operations needed to track timeouts. +pub(crate) trait Stack: Default { + /// Type of the item stored in the stack + type Owned: Borrow; + + /// Borrowed item + type Borrowed; + + /// Item storage, this allows a slab to be used instead of just the heap + type Store; + + /// Returns `true` if the stack is empty + fn is_empty(&self) -> bool; + + /// Push an item onto the stack + fn push(&mut self, item: Self::Owned, store: &mut Self::Store); + + /// Pop an item from the stack + fn pop(&mut self, store: &mut Self::Store) -> Option; + + fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store); + + fn when(item: &Self::Borrowed, store: &Self::Store) -> u64; +} diff --git a/tokio/tests/time_delay_queue.rs b/tokio-util/tests/time_delay_queue.rs similarity index 99% rename from tokio/tests/time_delay_queue.rs rename to tokio-util/tests/time_delay_queue.rs index d4878b91016..73951e7c2c0 100644 --- a/tokio/tests/time_delay_queue.rs +++ b/tokio-util/tests/time_delay_queue.rs @@ -2,8 +2,9 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::time::{self, sleep, DelayQueue, Duration, Instant}; +use tokio::time::{self, sleep, Duration, Instant}; use tokio_test::{assert_ok, assert_pending, assert_ready, task}; +use tokio_util::time::DelayQueue; macro_rules! poll { ($queue:ident) => { diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index b7085da2bbb..8d2efd6837c 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -84,11 +84,11 @@ signal = [ stream = ["futures-core"] sync = ["fnv"] test-util = [] -tcp = ["lazy_static", "mio/tcp", "mio/os-poll", "mio/os-util"] -time = ["slab"] -udp = ["lazy_static", "mio/udp", "mio/os-poll", "mio/os-util"] -uds = ["lazy_static", "libc", "mio/uds", "mio/os-poll", "mio/os-util"] -async-fd = ["lazy_static", "mio/udp", "mio/os-poll", "mio/os-util"] +tcp = ["lazy_static", "mio/tcp", "mio/os-poll"] +time = [] +udp = ["lazy_static", "mio/udp", "mio/os-poll"] +uds = ["lazy_static", "libc", "mio/uds", "mio/os-poll"] +async-fd = ["lazy_static", "mio/udp", "mio/os-poll"] [dependencies] tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true } @@ -104,7 +104,7 @@ memchr = { version = "2.2", optional = true } mio = { version = "0.7.2", optional = true } num_cpus = { version = "1.8.0", optional = true } parking_lot = { version = "0.11.0", optional = true } # Not in full -slab = { version = "0.4.1", optional = true } # Backs `DelayQueue` +slab = { version = "0.4.1", optional = true } tracing = { version = "0.1.16", default-features = false, features = ["std"], optional = true } # Not in full [target.'cfg(unix)'.dependencies] diff --git a/tokio/src/io/util/async_buf_read_ext.rs b/tokio/src/io/util/async_buf_read_ext.rs index 1bfab90220f..9e87f2f0dcf 100644 --- a/tokio/src/io/util/async_buf_read_ext.rs +++ b/tokio/src/io/util/async_buf_read_ext.rs @@ -14,7 +14,7 @@ cfg_io_util! { /// Equivalent to: /// /// ```ignore - /// async fn read_until(&mut self, buf: &mut Vec) -> io::Result; + /// async fn read_until(&mut self, byte: u8, buf: &mut Vec) -> io::Result; /// ``` /// /// This function will read bytes from the underlying stream until the diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index c8c797d9747..a68e11b56b4 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -14,9 +14,6 @@ //! of time it is allowed to execute. If the future or stream does not //! complete in time, then it is canceled and an error is returned. //! -//! * `DelayQueue`: A queue where items are returned once the requested delay -//! has expired. -//! //! These types are sufficient for handling a large number of scenarios //! involving time. //! @@ -96,10 +93,6 @@ pub(crate) use self::clock::Clock; #[cfg(feature = "test-util")] pub use clock::{advance, pause, resume}; -pub mod delay_queue; -#[doc(inline)] -pub use delay_queue::DelayQueue; - mod delay; pub use delay::{sleep, sleep_until, Delay}; diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index c82d8a5a46b..2ee38570025 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -2,7 +2,7 @@ #![cfg(feature = "full")] use std::cell::Cell; -use std::io::Cursor; +use std::io::{Cursor, SeekFrom}; use std::net::SocketAddr; use std::rc::Rc; use tokio::net::TcpStream; @@ -14,11 +14,21 @@ type BoxFutureSync = std::pin::Pin + type BoxFutureSend = std::pin::Pin + Send>>; #[allow(dead_code)] type BoxFuture = std::pin::Pin>>; +#[allow(dead_code)] +type BoxStream = std::pin::Pin>>; +#[allow(dead_code)] +type BoxAsyncRead = std::pin::Pin>; +#[allow(dead_code)] +type BoxAsyncSeek = std::pin::Pin>; +#[allow(dead_code)] +type BoxAsyncWrite = std::pin::Pin>; #[allow(dead_code)] fn require_send(_t: &T) {} #[allow(dead_code)] fn require_sync(_t: &T) {} +#[allow(dead_code)] +fn require_unpin(_t: &T) {} #[allow(dead_code)] struct Invalid; @@ -35,6 +45,12 @@ trait AmbiguousIfSync { impl AmbiguousIfSync<()> for T {} impl AmbiguousIfSync for T {} +trait AmbiguousIfUnpin { + fn some_item(&self) {} +} +impl AmbiguousIfUnpin<()> for T {} +impl AmbiguousIfUnpin for T {} + macro_rules! into_todo { ($typ:ty) => {{ let x: $typ = todo!(); @@ -116,6 +132,22 @@ macro_rules! async_assert_fn { AmbiguousIfSync::some_item(&f); }; }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Unpin) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + AmbiguousIfUnpin::some_item(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Unpin) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + require_unpin(&f); + }; + }; } async_assert_fn!(tokio::io::copy(&mut TcpStream, &mut TcpStream): Send & Sync); @@ -252,3 +284,58 @@ async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSync<()>): Send & Syn async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSend<()>): Send & !Sync); async_assert_fn!(tokio::time::timeout_at(Instant, BoxFuture<()>): !Send & !Sync); async_assert_fn!(tokio::time::Interval::tick(_): Send & Sync); + +async_assert_fn!(tokio::stream::StreamExt::next(&mut BoxStream<()>): !Unpin); +async_assert_fn!(tokio::stream::StreamExt::try_next(&mut BoxStream>): !Unpin); +async_assert_fn!(tokio::stream::StreamExt::all(&mut BoxStream<()>, fn(())->bool): !Unpin); +async_assert_fn!(tokio::stream::StreamExt::any(&mut BoxStream<()>, fn(())->bool): !Unpin); +async_assert_fn!(tokio::stream::StreamExt::fold(&mut BoxStream<()>, (), fn((), ())->()): !Unpin); +async_assert_fn!(tokio::stream::StreamExt::collect>(&mut BoxStream<()>): !Unpin); + +async_assert_fn!(tokio::io::AsyncBufReadExt::read_until(&mut BoxAsyncRead, u8, &mut Vec): !Unpin); +async_assert_fn!(tokio::io::AsyncBufReadExt::read_line(&mut BoxAsyncRead, &mut String): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read(&mut BoxAsyncRead, &mut [u8]): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_exact(&mut BoxAsyncRead, &mut [u8]): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u8(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i8(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u16(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i16(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u32(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i32(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u64(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i64(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u128(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i128(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u16_le(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i16_le(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u32_le(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i32_le(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u64_le(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i64_le(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_u128_le(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_i128_le(&mut BoxAsyncRead): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_to_end(&mut BoxAsyncRead, &mut Vec): !Unpin); +async_assert_fn!(tokio::io::AsyncReadExt::read_to_string(&mut BoxAsyncRead, &mut String): !Unpin); +async_assert_fn!(tokio::io::AsyncSeekExt::seek(&mut BoxAsyncSeek, SeekFrom): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write(&mut BoxAsyncWrite, &[u8]): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_all(&mut BoxAsyncWrite, &[u8]): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_u8(&mut BoxAsyncWrite, u8): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_i8(&mut BoxAsyncWrite, i8): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_u16(&mut BoxAsyncWrite, u16): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_i16(&mut BoxAsyncWrite, i16): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_u32(&mut BoxAsyncWrite, u32): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_i32(&mut BoxAsyncWrite, i32): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_u64(&mut BoxAsyncWrite, u64): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_i64(&mut BoxAsyncWrite, i64): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_u128(&mut BoxAsyncWrite, u128): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_i128(&mut BoxAsyncWrite, i128): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_u16_le(&mut BoxAsyncWrite, u16): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_i16_le(&mut BoxAsyncWrite, i16): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_u32_le(&mut BoxAsyncWrite, u32): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_i32_le(&mut BoxAsyncWrite, i32): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_u64_le(&mut BoxAsyncWrite, u64): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_i64_le(&mut BoxAsyncWrite, i64): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_u128_le(&mut BoxAsyncWrite, u128): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::write_i128_le(&mut BoxAsyncWrite, i128): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::flush(&mut BoxAsyncWrite): !Unpin); +async_assert_fn!(tokio::io::AsyncWriteExt::shutdown(&mut BoxAsyncWrite): !Unpin);