Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: misc time driver cleanup #5120

Merged
merged 1 commit into from Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion tokio/src/runtime/time/entry.rs
Expand Up @@ -576,7 +576,7 @@ impl TimerEntry {
this.inner().state.poll(cx.waker())
}

fn driver(&self) -> &super::Handle {
pub(crate) fn driver(&self) -> &super::Handle {
self.driver.time()
}
}
Expand Down
17 changes: 2 additions & 15 deletions tokio/src/runtime/time/handle.rs
@@ -1,31 +1,18 @@
use crate::loom::sync::Arc;
use crate::runtime::time::TimeSource;
use std::fmt;

/// Handle to time driver instance.
#[derive(Clone)]
pub(crate) struct Handle {
time_source: TimeSource,
pub(super) inner: Arc<super::Inner>,
pub(super) time_source: TimeSource,
pub(super) inner: super::Inner,
}

impl Handle {
/// Creates a new timer `Handle` from a shared `Inner` timer state.
pub(super) fn new(inner: Arc<super::Inner>) -> Self {
let time_source = inner.state.lock().time_source.clone();
Handle { time_source, inner }
}

/// Returns the time source associated with this handle.
pub(crate) fn time_source(&self) -> &TimeSource {
&self.time_source
}

/// Access the driver's inner structure.
pub(super) fn get(&self) -> &super::Inner {
&*self.inner
}

/// Checks whether the driver has been shutdown.
pub(super) fn is_shutdown(&self) -> bool {
self.inner.is_shutdown()
Expand Down
94 changes: 41 additions & 53 deletions tokio/src/runtime/time/mod.rs
Expand Up @@ -19,7 +19,7 @@ pub(crate) use source::TimeSource;
mod wheel;

use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Arc, Mutex};
use crate::loom::sync::Mutex;
use crate::runtime::driver::{self, IoHandle, IoStack};
use crate::time::error::Error;
use crate::time::{Clock, Duration};
Expand Down Expand Up @@ -84,20 +84,8 @@ use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
/// [interval]: crate::time::Interval
#[derive(Debug)]
pub(crate) struct Driver {
/// Timing backend in use.
time_source: TimeSource,

/// Parker to delegate to.
park: IoStack,

// When `true`, a call to `park_timeout` should immediately return and time
// should not advance. One reason for this to be `true` is if the task
// passed to `Runtime::block_on` called `task::yield_now()`.
//
// While it may look racy, it only has any effect when the clock is paused
// and pausing the clock is restricted to a single-threaded runtime.
#[cfg(feature = "test-util")]
did_wake: Arc<AtomicBool>,
}

/// Timer state shared between `Driver`, `Handle`, and `Registration`.
Expand All @@ -108,15 +96,18 @@ struct Inner {
/// True if the driver is being shutdown.
pub(super) is_shutdown: AtomicBool,

// When `true`, a call to `park_timeout` should immediately return and time
// should not advance. One reason for this to be `true` is if the task
// passed to `Runtime::block_on` called `task::yield_now()`.
//
// While it may look racy, it only has any effect when the clock is paused
// and pausing the clock is restricted to a single-threaded runtime.
#[cfg(feature = "test-util")]
did_wake: Arc<AtomicBool>,
did_wake: AtomicBool,
}

/// Time state shared which must be protected by a `Mutex`
struct InnerState {
/// Timing backend in use.
time_source: TimeSource,

/// The last published timer `elapsed` value.
elapsed: u64,

Expand All @@ -137,31 +128,23 @@ impl Driver {
pub(crate) fn new(park: IoStack, clock: Clock) -> (Driver, Handle) {
let time_source = TimeSource::new(clock);

#[cfg(feature = "test-util")]
let did_wake = Arc::new(AtomicBool::new(false));

let inner = Arc::new(Inner {
state: Mutex::new(InnerState {
time_source: time_source.clone(),
elapsed: 0,
next_wake: None,
wheel: wheel::Wheel::new(),
}),
is_shutdown: AtomicBool::new(false),

#[cfg(feature = "test-util")]
did_wake: did_wake.clone(),
});

let handle = Handle::new(inner);

let driver = Driver {
let handle = Handle {
time_source,
park,
#[cfg(feature = "test-util")]
did_wake,
inner: Inner {
state: Mutex::new(InnerState {
elapsed: 0,
next_wake: None,
wheel: wheel::Wheel::new(),
}),
is_shutdown: AtomicBool::new(false),

#[cfg(feature = "test-util")]
did_wake: AtomicBool::new(false),
},
};

let driver = Driver { park };

(driver, handle)
}

Expand All @@ -180,7 +163,7 @@ impl Driver {
return;
}

handle.get().is_shutdown.store(true, Ordering::SeqCst);
handle.inner.is_shutdown.store(true, Ordering::SeqCst);

// Advance time forward to the end of time.

Expand All @@ -191,7 +174,7 @@ impl Driver {

fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) {
let handle = rt_handle.time();
let mut lock = handle.get().state.lock();
let mut lock = handle.inner.state.lock();

assert!(!handle.is_shutdown());

Expand All @@ -203,11 +186,13 @@ impl Driver {

match next_wake {
Some(when) => {
let now = self.time_source.now();
let now = handle.time_source.now();
// Note that we effectively round up to 1ms here - this avoids
// very short-duration microsecond-resolution sleeps that the OS
// might treat as zero-length.
let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now));
let mut duration = handle
.time_source
.tick_to_duration(when.saturating_sub(now));

if duration > Duration::from_millis(0) {
if let Some(limit) = limit {
Expand All @@ -234,7 +219,8 @@ impl Driver {

cfg_test_util! {
fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let clock = &self.time_source.clock;
let handle = rt_handle.time();
let clock = &handle.time_source.clock;

if clock.is_paused() {
self.park.park_timeout(rt_handle, Duration::from_secs(0));
Expand All @@ -243,18 +229,14 @@ impl Driver {
// before the "duration" elapsed (usually caused by a
// yield in `Runtime::block_on`). In this case, we don't
// advance the clock.
if !self.did_wake() {
if !handle.did_wake() {
// Simulate advancing time
clock.advance(duration);
}
} else {
self.park.park_timeout(rt_handle, duration);
}
}

fn did_wake(&self) -> bool {
self.did_wake.swap(false, Ordering::SeqCst)
}
}

cfg_not_test_util! {
Expand All @@ -276,7 +258,7 @@ impl Handle {
let mut waker_list: [Option<Waker>; 32] = Default::default();
let mut waker_idx = 0;

let mut lock = self.get().lock();
let mut lock = self.inner.lock();

if now < lock.elapsed {
// Time went backwards! This normally shouldn't happen as the Rust language
Expand Down Expand Up @@ -307,7 +289,7 @@ impl Handle {

waker_idx = 0;

lock = self.get().lock();
lock = self.inner.lock();
}
}
}
Expand Down Expand Up @@ -338,7 +320,7 @@ impl Handle {
/// `add_entry` must not be called concurrently.
pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
unsafe {
let mut lock = self.get().lock();
let mut lock = self.inner.lock();

if entry.as_ref().might_be_registered() {
lock.wheel.remove(entry);
Expand All @@ -361,7 +343,7 @@ impl Handle {
entry: NonNull<TimerShared>,
) {
let waker = unsafe {
let mut lock = self.get().lock();
let mut lock = self.inner.lock();

// We may have raced with a firing/deregistration, so check before
// deregistering.
Expand Down Expand Up @@ -408,6 +390,12 @@ impl Handle {
waker.wake();
}
}

cfg_test_util! {
fn did_wake(&self) -> bool {
self.inner.did_wake.swap(false, Ordering::SeqCst)
}
}
}

// ===== impl Inner =====
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/time/source.rs
Expand Up @@ -3,7 +3,7 @@ use crate::time::{Clock, Duration, Instant};
use std::convert::TryInto;

/// A structure which handles conversion from Instants to u64 timestamps.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub(crate) struct TimeSource {
pub(crate) clock: Clock,
start_time: Instant,
Expand Down
15 changes: 6 additions & 9 deletions tokio/src/time/sleep.rs
@@ -1,5 +1,3 @@
#[cfg(all(tokio_unstable, feature = "tracing"))]
use crate::runtime::time::TimeSource;
use crate::runtime::time::TimerEntry;
use crate::time::{error::Error, Duration, Instant};
use crate::util::trace;
Expand Down Expand Up @@ -239,7 +237,6 @@ cfg_trace! {
struct Inner {
deadline: Instant,
ctx: trace::AsyncOpTracingCtx,
time_source: TimeSource,
}
}

Expand All @@ -266,7 +263,7 @@ impl Sleep {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = {
let handle = &handle.time();
let time_source = handle.time_source().clone();
let time_source = handle.time_source();
let deadline_tick = time_source.deadline_to_tick(deadline);
let duration = deadline_tick.saturating_sub(time_source.now());

Expand Down Expand Up @@ -303,7 +300,6 @@ impl Sleep {
Inner {
deadline,
ctx,
time_source,
}
};

Expand Down Expand Up @@ -374,8 +370,8 @@ impl Sleep {
}

fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
let me = self.project();
me.entry.reset(deadline);
let mut me = self.project();
me.entry.as_mut().reset(deadline);
(*me.inner).deadline = deadline;

#[cfg(all(tokio_unstable, feature = "tracing"))]
Expand All @@ -389,8 +385,9 @@ impl Sleep {
tracing::trace_span!("runtime.resource.async_op.poll");

let duration = {
let now = me.inner.time_source.now();
let deadline_tick = me.inner.time_source.deadline_to_tick(deadline);
let time_source = me.entry.driver().time_source();
let now = time_source.now();
let deadline_tick = time_source.deadline_to_tick(deadline);
deadline_tick.saturating_sub(now)
};

Expand Down