Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: move time driver into runtime module #4983

Merged
merged 1 commit into from Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 2 additions & 3 deletions tokio/src/lib.rs
Expand Up @@ -497,9 +497,8 @@ cfg_rt! {
pub mod runtime;
}
cfg_not_rt! {
cfg_io_driver_impl! {
pub(crate) mod runtime;
}
// The `runtime` module is used when the IO or time driver is needed.
pub(crate) mod runtime;
}

pub(crate) mod coop;
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/runtime/driver.rs
Expand Up @@ -98,10 +98,10 @@ cfg_not_process_driver! {
// ===== time driver =====

cfg_time! {
type TimeDriver = crate::park::either::Either<crate::time::driver::Driver<IoStack>, IoStack>;
type TimeDriver = crate::park::either::Either<crate::runtime::time::Driver<IoStack>, IoStack>;

pub(crate) type Clock = crate::time::Clock;
pub(crate) type TimeHandle = Option<crate::time::driver::Handle>;
pub(crate) type TimeHandle = Option<crate::runtime::time::Handle>;

fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock {
crate::time::Clock::new(enable_pausing, start_paused)
Expand All @@ -115,7 +115,7 @@ cfg_time! {
use crate::park::either::Either;

if enable {
let driver = crate::time::driver::Driver::new(io_stack, clock);
let driver = crate::runtime::time::Driver::new(io_stack, clock);
let handle = driver.handle();

(Either::A(driver), Some(handle))
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/mod.rs
Expand Up @@ -181,6 +181,10 @@ cfg_io_driver_impl! {
pub(crate) mod io;
}

cfg_time! {
pub(crate) mod time;
}

cfg_rt! {
pub(crate) mod enter;

Expand Down
Expand Up @@ -283,7 +283,7 @@ impl StateCell {
/// timer. As this participates in intrusive data structures, it must be pinned
/// before polling.
#[derive(Debug)]
pub(super) struct TimerEntry {
pub(crate) struct TimerEntry {
/// Arc reference to the driver. We can only free the driver after
/// deregistering everything from their respective timer wheels.
driver: Handle,
Expand Down
@@ -1,5 +1,5 @@
use crate::loom::sync::Arc;
use crate::time::driver::ClockTime;
use crate::runtime::time::ClockTime;
use std::fmt;

/// Handle to time driver instance.
Expand All @@ -17,7 +17,7 @@ impl Handle {
}

/// Returns the time source associated with this handle.
pub(super) fn time_source(&self) -> &ClockTime {
pub(crate) fn time_source(&self) -> &ClockTime {
&self.time_source
}

Expand Down
15 changes: 7 additions & 8 deletions tokio/src/time/driver/mod.rs → tokio/src/runtime/time/mod.rs
Expand Up @@ -7,15 +7,14 @@
//! Time driver.

mod entry;
pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared};
pub(crate) use entry::TimerEntry;
use entry::{EntryList, TimerHandle, TimerShared};

mod handle;
pub(crate) use self::handle::Handle;

mod wheel;

pub(super) mod sleep;

use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Arc, Mutex};
use crate::park::{Park, Unpark};
Expand Down Expand Up @@ -104,8 +103,8 @@ pub(crate) struct Driver<P: Park + 'static> {

/// A structure which handles conversion from Instants to u64 timestamps.
#[derive(Debug, Clone)]
pub(self) struct ClockTime {
clock: super::clock::Clock,
pub(crate) struct ClockTime {
clock: crate::time::Clock,
start_time: Instant,
}

Expand All @@ -117,7 +116,7 @@ impl ClockTime {
}
}

pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 {
pub(crate) fn deadline_to_tick(&self, t: Instant) -> u64 {
// Round up to the end of a ms
self.instant_to_tick(t + Duration::from_nanos(999_999))
}
Expand All @@ -136,7 +135,7 @@ impl ClockTime {
Duration::from_millis(t)
}

pub(self) fn now(&self) -> u64 {
pub(crate) fn now(&self) -> u64 {
self.instant_to_tick(self.clock.now())
}
}
Expand Down Expand Up @@ -403,7 +402,7 @@ impl Handle {

None
}
Err((entry, super::error::InsertError::Elapsed)) => unsafe {
Err((entry, crate::time::error::InsertError::Elapsed)) => unsafe {
entry.fire(Ok(()))
},
}
Expand Down
Expand Up @@ -48,7 +48,7 @@ fn model(f: impl Fn() + Send + Sync + 'static) {
#[test]
fn single_timer() {
model(|| {
let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());

let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
Expand Down Expand Up @@ -79,7 +79,7 @@ fn single_timer() {
#[test]
fn drop_timer() {
model(|| {
let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());

let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
Expand Down Expand Up @@ -110,7 +110,7 @@ fn drop_timer() {
#[test]
fn change_waker() {
model(|| {
let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());

let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
Expand Down Expand Up @@ -145,7 +145,7 @@ fn reset_future() {
model(|| {
let finished_early = Arc::new(AtomicBool::new(false));

let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());

let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
Expand Down Expand Up @@ -201,7 +201,7 @@ fn normal_or_miri<T>(normal: T, miri: T) -> T {
#[test]
#[cfg(not(loom))]
fn poll_process_levels() {
let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::Clock::new(true, false);
clock.pause();

let time_source = super::ClockTime::new(clock.clone());
Expand Down Expand Up @@ -242,7 +242,7 @@ fn poll_process_levels() {
fn poll_process_levels_targeted() {
let mut context = Context::from_waker(noop_waker_ref());

let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::Clock::new(true, false);
clock.pause();

let time_source = super::ClockTime::new(clock.clone());
Expand All @@ -258,46 +258,3 @@ fn poll_process_levels_targeted() {
handle.process_at_time(192);
handle.process_at_time(192);
}

/*
#[test]
fn balanced_incr_and_decr() {
const OPS: usize = 5;

fn incr(inner: Arc<Inner>) {
for _ in 0..OPS {
inner.increment().expect("increment should not have failed");
thread::yield_now();
}
}

fn decr(inner: Arc<Inner>) {
let mut ops_performed = 0;
while ops_performed < OPS {
if inner.num(Ordering::Relaxed) > 0 {
ops_performed += 1;
inner.decrement();
}
thread::yield_now();
}
}

loom::model(|| {
let unpark = Box::new(MockUnpark);
let instant = Instant::now();

let inner = Arc::new(Inner::new(instant, unpark));

let incr_inner = inner.clone();
let decr_inner = inner.clone();

let incr_handle = thread::spawn(move || incr(incr_inner));
let decr_handle = thread::spawn(move || decr(decr_inner));

incr_handle.join().expect("should never fail");
decr_handle.join().expect("should never fail");

assert_eq!(inner.num(Ordering::SeqCst), 0);
})
}
*/
@@ -1,6 +1,4 @@
use crate::time::driver::TimerHandle;

use crate::time::driver::{EntryList, TimerShared};
use crate::runtime::time::{EntryList, TimerHandle, TimerShared};

use std::{fmt, ptr::NonNull};

Expand Down
@@ -1,4 +1,4 @@
use crate::time::driver::{TimerHandle, TimerShared};
use crate::runtime::time::{TimerHandle, TimerShared};
use crate::time::error::InsertError;

mod level;
Expand Down
13 changes: 4 additions & 9 deletions tokio/src/time/mod.rs
Expand Up @@ -82,17 +82,13 @@
//! ```
//!
//! [`interval`]: crate::time::interval()
//! [`sleep`]: sleep()

mod clock;
pub(crate) use self::clock::Clock;
#[cfg(feature = "test-util")]
pub use clock::{advance, pause, resume};

pub(crate) mod driver;

#[doc(inline)]
pub use driver::sleep::{sleep, sleep_until, Sleep};

pub mod error;

mod instant;
Expand All @@ -101,14 +97,13 @@ pub use self::instant::Instant;
mod interval;
pub use interval::{interval, interval_at, Interval, MissedTickBehavior};

mod sleep;
pub use sleep::{sleep, sleep_until, Sleep};

mod timeout;
#[doc(inline)]
pub use timeout::{timeout, timeout_at, Timeout};

#[cfg(test)]
#[cfg(not(loom))]
mod tests;

// Re-export for convenience
#[doc(no_inline)]
pub use std::time::Duration;
4 changes: 2 additions & 2 deletions tokio/src/time/driver/sleep.rs → tokio/src/time/sleep.rs
@@ -1,6 +1,6 @@
#[cfg(all(tokio_unstable, feature = "tracing"))]
use crate::time::driver::ClockTime;
use crate::time::driver::{Handle, TimerEntry};
use crate::runtime::time::ClockTime;
use crate::runtime::time::{Handle, TimerEntry};
use crate::time::{error::Error, Duration, Instant};
use crate::util::trace;

Expand Down
22 changes: 0 additions & 22 deletions tokio/src/time/tests/mod.rs

This file was deleted.