Skip to content

Commit

Permalink
remove frequent global is_shutdown flag check on timer operations
Browse files Browse the repository at this point in the history
Make timer driver's is_shutdown flag nonatomic and move it into the driver lock.
On shtudown, the driver wake the all pending timer with shutdown error.

Simillar to the tokio-rs#5300, but for timer driver not io driver.

Refs: tokio-rs#5227, tokio-rs#5300
  • Loading branch information
HyeonuPark committed Dec 17, 2022
1 parent 42db755 commit f13a63c
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 43 deletions.
6 changes: 1 addition & 5 deletions tokio/src/runtime/time/entry.rs
Expand Up @@ -126,7 +126,7 @@ impl StateCell {
fn when(&self) -> Option<u64> {
let cur_state = self.state.load(Ordering::Relaxed);

if cur_state == u64::MAX {
if cur_state >= STATE_MIN_VALUE {
None
} else {
Some(cur_state)
Expand Down Expand Up @@ -563,10 +563,6 @@ impl TimerEntry {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), super::Error>> {
if self.driver().is_shutdown() {
panic!("{}", crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR);
}

if let Some(deadline) = self.initial_deadline {
self.as_mut().reset(deadline);
}
Expand Down
5 changes: 0 additions & 5 deletions tokio/src/runtime/time/handle.rs
Expand Up @@ -13,11 +13,6 @@ impl Handle {
&self.time_source
}

/// Checks whether the driver has been shutdown.
pub(super) fn is_shutdown(&self) -> bool {
self.inner.is_shutdown()
}

/// Track that the driver is being unparked
pub(crate) fn unpark(&self) {
#[cfg(feature = "test-util")]
Expand Down
48 changes: 27 additions & 21 deletions tokio/src/runtime/time/mod.rs
Expand Up @@ -93,9 +93,6 @@ struct Inner {
// The state is split like this so `Handle` can access `is_shutdown` without locking the mutex
pub(super) state: Mutex<InnerState>,

/// True if the driver is being shutdown.
pub(super) is_shutdown: AtomicBool,

// When `true`, a call to `park_timeout` should immediately return and time
// should not advance. One reason for this to be `true` is if the task
// passed to `Runtime::block_on` called `task::yield_now()`.
Expand All @@ -116,6 +113,9 @@ struct InnerState {

/// Timer wheel.
wheel: wheel::Wheel,

/// True if the driver is being shutdown.
is_shutdown: bool,
}

// ===== impl Driver =====
Expand All @@ -135,8 +135,8 @@ impl Driver {
elapsed: 0,
next_wake: None,
wheel: wheel::Wheel::new(),
is_shutdown: false,
}),
is_shutdown: AtomicBool::new(false),

#[cfg(feature = "test-util")]
did_wake: AtomicBool::new(false),
Expand All @@ -159,15 +159,17 @@ impl Driver {
pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.time();

if handle.is_shutdown() {
return;
}
{
let mut lock = handle.inner.lock();

handle.inner.is_shutdown.store(true, Ordering::SeqCst);
if lock.is_shutdown {
return;
}

// Advance time forward to the end of time.
lock.is_shutdown = true;
}

handle.process_at_time(u64::MAX);
handle.process_at_time(None);

self.park.shutdown(rt_handle);
}
Expand All @@ -176,7 +178,7 @@ impl Driver {
let handle = rt_handle.time();
let mut lock = handle.inner.state.lock();

assert!(!handle.is_shutdown());
assert!(!lock.is_shutdown);

let next_wake = lock.wheel.next_expiration_time();
lock.next_wake =
Expand Down Expand Up @@ -251,10 +253,16 @@ impl Handle {
pub(self) fn process(&self) {
let now = self.time_source().now();

self.process_at_time(now)
self.process_at_time(Some(now))
}

pub(self) fn process_at_time(&self, mut now: u64) {
pub(self) fn process_at_time(&self, now: Option<u64>) {
let (mut now, state) = match now {
Some(now) => (now, Ok(())),
// Runtime being shutdown, advance time forward to the end of time.
None => (u64::MAX, Err(Error::shutdown())),
};

let mut waker_list: [Option<Waker>; 32] = Default::default();
let mut waker_idx = 0;

Expand All @@ -274,7 +282,7 @@ impl Handle {
debug_assert!(unsafe { entry.is_pending() });

// SAFETY: We hold the driver lock, and just removed the entry from any linked lists.
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
if let Some(waker) = unsafe { entry.fire(state) } {
waker_list[waker_idx] = Some(waker);

waker_idx += 1;
Expand Down Expand Up @@ -354,8 +362,11 @@ impl Handle {
// Now that we have exclusive control of this entry, mint a handle to reinsert it.
let entry = entry.as_ref().handle();

if self.is_shutdown() {
unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
if lock.is_shutdown {
unsafe {
entry.set_expiration(0);
entry.fire(Err(Error::shutdown()))
}
} else {
entry.set_expiration(new_tick);

Expand Down Expand Up @@ -405,11 +416,6 @@ impl Inner {
pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> {
self.state.lock()
}

// Check whether the driver has been shutdown
pub(super) fn is_shutdown(&self) -> bool {
self.is_shutdown.load(Ordering::SeqCst)
}
}

impl fmt::Debug for Inner {
Expand Down
22 changes: 11 additions & 11 deletions tokio/src/runtime/time/tests/mod.rs
Expand Up @@ -67,7 +67,7 @@ fn single_timer() {
// This may or may not return Some (depending on how it races with the
// thread). If it does return None, however, the timer should complete
// synchronously.
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
handle.process_at_time(Some(handle.time_source().now() + 2_000_000_000));

jh.join().unwrap();
})
Expand Down Expand Up @@ -100,7 +100,7 @@ fn drop_timer() {
let handle = handle.inner.driver().time();

// advance 2s in the future.
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
handle.process_at_time(Some(handle.time_source().now() + 2_000_000_000));

jh.join().unwrap();
})
Expand Down Expand Up @@ -135,7 +135,7 @@ fn change_waker() {
let handle = handle.inner.driver().time();

// advance 2s
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
handle.process_at_time(Some(handle.time_source().now() + 2_000_000_000));

jh.join().unwrap();
})
Expand Down Expand Up @@ -177,19 +177,19 @@ fn reset_future() {
let handle = handle.inner.driver().time();

// This may or may not return a wakeup time.
handle.process_at_time(
handle.process_at_time(Some(
handle
.time_source()
.instant_to_tick(start + Duration::from_millis(1500)),
);
));

assert!(!finished_early.load(Ordering::Relaxed));

handle.process_at_time(
handle.process_at_time(Some(
handle
.time_source()
.instant_to_tick(start + Duration::from_millis(2500)),
);
));

jh.join().unwrap();

Expand Down Expand Up @@ -228,7 +228,7 @@ fn poll_process_levels() {
}

for t in 1..normal_or_miri(1024, 64) {
handle.inner.driver().time().process_at_time(t as u64);
handle.inner.driver().time().process_at_time(Some(t as u64));

for (deadline, future) in entries.iter_mut().enumerate() {
let mut context = Context::from_waker(noop_waker_ref());
Expand Down Expand Up @@ -257,8 +257,8 @@ fn poll_process_levels_targeted() {

let handle = handle.inner.driver().time();

handle.process_at_time(62);
handle.process_at_time(Some(62));
assert!(e1.as_mut().poll_elapsed(&mut context).is_pending());
handle.process_at_time(192);
handle.process_at_time(192);
handle.process_at_time(Some(192));
handle.process_at_time(Some(192));
}
2 changes: 1 addition & 1 deletion tokio/src/time/error.rs
Expand Up @@ -92,7 +92,7 @@ impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
use self::Kind::*;
let descr = match self.0 {
Shutdown => "the timer is shutdown, must be called from the context of Tokio runtime",
Shutdown => crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
AtCapacity => "timer is at capacity and cannot create a new entry",
Invalid => "timer duration exceeds maximum duration",
};
Expand Down
19 changes: 19 additions & 0 deletions tokio/tests/rt_handle_block_on.rs
Expand Up @@ -467,6 +467,25 @@ multi_threaded_rt_test! {
Handle::current().block_on(f);
}

#[test]
#[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
fn sleep_polled_before_shutdown_panics() {
use futures::future::poll_immediate;

let rt = rt();
let _enter = rt.enter();

let f = time::sleep(Duration::from_millis(100));
tokio::pin!(f);
Handle::current().block_on(async {
assert!(poll_immediate(f.as_mut()).await.is_none());
});

rt.shutdown_timeout(Duration::from_secs(1000));

Handle::current().block_on(f);
}

#[test]
#[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
fn sleep_after_shutdown_panics() {
Expand Down

0 comments on commit f13a63c

Please sign in to comment.