Skip to content

Commit

Permalink
rt: misc time driver cleanup (#5120)
Browse files Browse the repository at this point in the history
Removes an unnecessary `Arc` and reduces internal state clones.
  • Loading branch information
carllerche committed Oct 24, 2022
1 parent b248be2 commit 80568df
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 79 deletions.
2 changes: 1 addition & 1 deletion tokio/src/runtime/time/entry.rs
Expand Up @@ -576,7 +576,7 @@ impl TimerEntry {
this.inner().state.poll(cx.waker())
}

fn driver(&self) -> &super::Handle {
pub(crate) fn driver(&self) -> &super::Handle {
self.driver.time()
}
}
Expand Down
17 changes: 2 additions & 15 deletions tokio/src/runtime/time/handle.rs
@@ -1,31 +1,18 @@
use crate::loom::sync::Arc;
use crate::runtime::time::TimeSource;
use std::fmt;

/// Handle to time driver instance.
#[derive(Clone)]
pub(crate) struct Handle {
time_source: TimeSource,
pub(super) inner: Arc<super::Inner>,
pub(super) time_source: TimeSource,
pub(super) inner: super::Inner,
}

impl Handle {
/// Creates a new timer `Handle` from a shared `Inner` timer state.
pub(super) fn new(inner: Arc<super::Inner>) -> Self {
let time_source = inner.state.lock().time_source.clone();
Handle { time_source, inner }
}

/// Returns the time source associated with this handle.
pub(crate) fn time_source(&self) -> &TimeSource {
&self.time_source
}

/// Access the driver's inner structure.
pub(super) fn get(&self) -> &super::Inner {
&*self.inner
}

/// Checks whether the driver has been shutdown.
pub(super) fn is_shutdown(&self) -> bool {
self.inner.is_shutdown()
Expand Down
94 changes: 41 additions & 53 deletions tokio/src/runtime/time/mod.rs
Expand Up @@ -19,7 +19,7 @@ pub(crate) use source::TimeSource;
mod wheel;

use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Arc, Mutex};
use crate::loom::sync::Mutex;
use crate::runtime::driver::{self, IoHandle, IoStack};
use crate::time::error::Error;
use crate::time::{Clock, Duration};
Expand Down Expand Up @@ -84,20 +84,8 @@ use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
/// [interval]: crate::time::Interval
#[derive(Debug)]
pub(crate) struct Driver {
/// Timing backend in use.
time_source: TimeSource,

/// Parker to delegate to.
park: IoStack,

// When `true`, a call to `park_timeout` should immediately return and time
// should not advance. One reason for this to be `true` is if the task
// passed to `Runtime::block_on` called `task::yield_now()`.
//
// While it may look racy, it only has any effect when the clock is paused
// and pausing the clock is restricted to a single-threaded runtime.
#[cfg(feature = "test-util")]
did_wake: Arc<AtomicBool>,
}

/// Timer state shared between `Driver`, `Handle`, and `Registration`.
Expand All @@ -108,15 +96,18 @@ struct Inner {
/// True if the driver is being shutdown.
pub(super) is_shutdown: AtomicBool,

// When `true`, a call to `park_timeout` should immediately return and time
// should not advance. One reason for this to be `true` is if the task
// passed to `Runtime::block_on` called `task::yield_now()`.
//
// While it may look racy, it only has any effect when the clock is paused
// and pausing the clock is restricted to a single-threaded runtime.
#[cfg(feature = "test-util")]
did_wake: Arc<AtomicBool>,
did_wake: AtomicBool,
}

/// Time state shared which must be protected by a `Mutex`
struct InnerState {
/// Timing backend in use.
time_source: TimeSource,

/// The last published timer `elapsed` value.
elapsed: u64,

Expand All @@ -137,31 +128,23 @@ impl Driver {
pub(crate) fn new(park: IoStack, clock: Clock) -> (Driver, Handle) {
let time_source = TimeSource::new(clock);

#[cfg(feature = "test-util")]
let did_wake = Arc::new(AtomicBool::new(false));

let inner = Arc::new(Inner {
state: Mutex::new(InnerState {
time_source: time_source.clone(),
elapsed: 0,
next_wake: None,
wheel: wheel::Wheel::new(),
}),
is_shutdown: AtomicBool::new(false),

#[cfg(feature = "test-util")]
did_wake: did_wake.clone(),
});

let handle = Handle::new(inner);

let driver = Driver {
let handle = Handle {
time_source,
park,
#[cfg(feature = "test-util")]
did_wake,
inner: Inner {
state: Mutex::new(InnerState {
elapsed: 0,
next_wake: None,
wheel: wheel::Wheel::new(),
}),
is_shutdown: AtomicBool::new(false),

#[cfg(feature = "test-util")]
did_wake: AtomicBool::new(false),
},
};

let driver = Driver { park };

(driver, handle)
}

Expand All @@ -180,7 +163,7 @@ impl Driver {
return;
}

handle.get().is_shutdown.store(true, Ordering::SeqCst);
handle.inner.is_shutdown.store(true, Ordering::SeqCst);

// Advance time forward to the end of time.

Expand All @@ -191,7 +174,7 @@ impl Driver {

fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) {
let handle = rt_handle.time();
let mut lock = handle.get().state.lock();
let mut lock = handle.inner.state.lock();

assert!(!handle.is_shutdown());

Expand All @@ -203,11 +186,13 @@ impl Driver {

match next_wake {
Some(when) => {
let now = self.time_source.now();
let now = handle.time_source.now();
// Note that we effectively round up to 1ms here - this avoids
// very short-duration microsecond-resolution sleeps that the OS
// might treat as zero-length.
let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now));
let mut duration = handle
.time_source
.tick_to_duration(when.saturating_sub(now));

if duration > Duration::from_millis(0) {
if let Some(limit) = limit {
Expand All @@ -234,7 +219,8 @@ impl Driver {

cfg_test_util! {
fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let clock = &self.time_source.clock;
let handle = rt_handle.time();
let clock = &handle.time_source.clock;

if clock.is_paused() {
self.park.park_timeout(rt_handle, Duration::from_secs(0));
Expand All @@ -243,18 +229,14 @@ impl Driver {
// before the "duration" elapsed (usually caused by a
// yield in `Runtime::block_on`). In this case, we don't
// advance the clock.
if !self.did_wake() {
if !handle.did_wake() {
// Simulate advancing time
clock.advance(duration);
}
} else {
self.park.park_timeout(rt_handle, duration);
}
}

fn did_wake(&self) -> bool {
self.did_wake.swap(false, Ordering::SeqCst)
}
}

cfg_not_test_util! {
Expand All @@ -276,7 +258,7 @@ impl Handle {
let mut waker_list: [Option<Waker>; 32] = Default::default();
let mut waker_idx = 0;

let mut lock = self.get().lock();
let mut lock = self.inner.lock();

if now < lock.elapsed {
// Time went backwards! This normally shouldn't happen as the Rust language
Expand Down Expand Up @@ -307,7 +289,7 @@ impl Handle {

waker_idx = 0;

lock = self.get().lock();
lock = self.inner.lock();
}
}
}
Expand Down Expand Up @@ -338,7 +320,7 @@ impl Handle {
/// `add_entry` must not be called concurrently.
pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
unsafe {
let mut lock = self.get().lock();
let mut lock = self.inner.lock();

if entry.as_ref().might_be_registered() {
lock.wheel.remove(entry);
Expand All @@ -361,7 +343,7 @@ impl Handle {
entry: NonNull<TimerShared>,
) {
let waker = unsafe {
let mut lock = self.get().lock();
let mut lock = self.inner.lock();

// We may have raced with a firing/deregistration, so check before
// deregistering.
Expand Down Expand Up @@ -408,6 +390,12 @@ impl Handle {
waker.wake();
}
}

cfg_test_util! {
fn did_wake(&self) -> bool {
self.inner.did_wake.swap(false, Ordering::SeqCst)
}
}
}

// ===== impl Inner =====
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/time/source.rs
Expand Up @@ -3,7 +3,7 @@ use crate::time::{Clock, Duration, Instant};
use std::convert::TryInto;

/// A structure which handles conversion from Instants to u64 timestamps.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub(crate) struct TimeSource {
pub(crate) clock: Clock,
start_time: Instant,
Expand Down
15 changes: 6 additions & 9 deletions tokio/src/time/sleep.rs
@@ -1,5 +1,3 @@
#[cfg(all(tokio_unstable, feature = "tracing"))]
use crate::runtime::time::TimeSource;
use crate::runtime::time::TimerEntry;
use crate::time::{error::Error, Duration, Instant};
use crate::util::trace;
Expand Down Expand Up @@ -239,7 +237,6 @@ cfg_trace! {
struct Inner {
deadline: Instant,
ctx: trace::AsyncOpTracingCtx,
time_source: TimeSource,
}
}

Expand All @@ -266,7 +263,7 @@ impl Sleep {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = {
let handle = &handle.time();
let time_source = handle.time_source().clone();
let time_source = handle.time_source();
let deadline_tick = time_source.deadline_to_tick(deadline);
let duration = deadline_tick.saturating_sub(time_source.now());

Expand Down Expand Up @@ -303,7 +300,6 @@ impl Sleep {
Inner {
deadline,
ctx,
time_source,
}
};

Expand Down Expand Up @@ -374,8 +370,8 @@ impl Sleep {
}

fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
let me = self.project();
me.entry.reset(deadline);
let mut me = self.project();
me.entry.as_mut().reset(deadline);
(*me.inner).deadline = deadline;

#[cfg(all(tokio_unstable, feature = "tracing"))]
Expand All @@ -389,8 +385,9 @@ impl Sleep {
tracing::trace_span!("runtime.resource.async_op.poll");

let duration = {
let now = me.inner.time_source.now();
let deadline_tick = me.inner.time_source.deadline_to_tick(deadline);
let time_source = me.entry.driver().time_source();
let now = time_source.now();
let deadline_tick = time_source.deadline_to_tick(deadline);
deadline_tick.saturating_sub(now)
};

Expand Down

0 comments on commit 80568df

Please sign in to comment.