Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove frequent global is_shutdown flag check on timer operations #5301

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 1 addition & 5 deletions tokio/src/runtime/time/entry.rs
Expand Up @@ -126,7 +126,7 @@ impl StateCell {
fn when(&self) -> Option<u64> {
let cur_state = self.state.load(Ordering::Relaxed);

if cur_state == u64::MAX {
if cur_state >= STATE_MIN_VALUE {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like you've also included a fix for #5082 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems so, though I discovered the issue just now. During working on this issue I messed up more in this module and cleared up most of them but this line survived. If you think it's better to not mix #5082 here I'll revert it.

None
} else {
Some(cur_state)
Expand Down Expand Up @@ -563,10 +563,6 @@ impl TimerEntry {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), super::Error>> {
if self.driver().is_shutdown() {
panic!("{}", crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR);
}

if let Some(deadline) = self.initial_deadline {
self.as_mut().reset(deadline);
}
Expand Down
5 changes: 0 additions & 5 deletions tokio/src/runtime/time/handle.rs
Expand Up @@ -13,11 +13,6 @@ impl Handle {
&self.time_source
}

/// Checks whether the driver has been shutdown.
pub(super) fn is_shutdown(&self) -> bool {
self.inner.is_shutdown()
}

/// Track that the driver is being unparked
pub(crate) fn unpark(&self) {
#[cfg(feature = "test-util")]
Expand Down
49 changes: 28 additions & 21 deletions tokio/src/runtime/time/mod.rs
Expand Up @@ -18,6 +18,7 @@ pub(crate) use source::TimeSource;

mod wheel;

#[cfg(feature = "test-util")]
use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::Mutex;
use crate::runtime::driver::{self, IoHandle, IoStack};
Expand Down Expand Up @@ -93,9 +94,6 @@ struct Inner {
// The state is split like this so `Handle` can access `is_shutdown` without locking the mutex
pub(super) state: Mutex<InnerState>,

/// True if the driver is being shutdown.
pub(super) is_shutdown: AtomicBool,
Comment on lines -96 to -97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember this being an atomic, and I assumed it wasn't when I made the comment that the time driver has a similar issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. It's questionable that this change improves performance at all. TBH I have no idea how to benchmark timers only.


// When `true`, a call to `park_timeout` should immediately return and time
// should not advance. One reason for this to be `true` is if the task
// passed to `Runtime::block_on` called `task::yield_now()`.
Expand All @@ -116,6 +114,9 @@ struct InnerState {

/// Timer wheel.
wheel: wheel::Wheel,

/// True if the driver is being shutdown.
is_shutdown: bool,
}

// ===== impl Driver =====
Expand All @@ -135,8 +136,8 @@ impl Driver {
elapsed: 0,
next_wake: None,
wheel: wheel::Wheel::new(),
is_shutdown: false,
}),
is_shutdown: AtomicBool::new(false),

#[cfg(feature = "test-util")]
did_wake: AtomicBool::new(false),
Expand All @@ -159,15 +160,17 @@ impl Driver {
pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.time();

if handle.is_shutdown() {
return;
}
{
let mut lock = handle.inner.lock();

handle.inner.is_shutdown.store(true, Ordering::SeqCst);
if lock.is_shutdown {
return;
}

// Advance time forward to the end of time.
lock.is_shutdown = true;
}

handle.process_at_time(u64::MAX);
handle.process_at_time(None);

self.park.shutdown(rt_handle);
}
Expand All @@ -176,7 +179,7 @@ impl Driver {
let handle = rt_handle.time();
let mut lock = handle.inner.state.lock();

assert!(!handle.is_shutdown());
assert!(!lock.is_shutdown);

let next_wake = lock.wheel.next_expiration_time();
lock.next_wake =
Expand Down Expand Up @@ -251,10 +254,16 @@ impl Handle {
pub(self) fn process(&self) {
let now = self.time_source().now();

self.process_at_time(now)
self.process_at_time(Some(now))
}

pub(self) fn process_at_time(&self, mut now: u64) {
pub(self) fn process_at_time(&self, now: Option<u64>) {
let (mut now, state) = match now {
Some(now) => (now, Ok(())),
// Runtime being shutdown, advance time forward to the end of time.
None => (u64::MAX, Err(Error::shutdown())),
};

let mut waker_list: [Option<Waker>; 32] = Default::default();
let mut waker_idx = 0;

Expand All @@ -274,7 +283,7 @@ impl Handle {
debug_assert!(unsafe { entry.is_pending() });

// SAFETY: We hold the driver lock, and just removed the entry from any linked lists.
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
if let Some(waker) = unsafe { entry.fire(state) } {
waker_list[waker_idx] = Some(waker);

waker_idx += 1;
Expand Down Expand Up @@ -354,8 +363,11 @@ impl Handle {
// Now that we have exclusive control of this entry, mint a handle to reinsert it.
let entry = entry.as_ref().handle();

if self.is_shutdown() {
unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
if lock.is_shutdown {
unsafe {
entry.set_expiration(0);
entry.fire(Err(Error::shutdown()))
}
} else {
entry.set_expiration(new_tick);

Expand Down Expand Up @@ -405,11 +417,6 @@ impl Inner {
pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> {
self.state.lock()
}

// Check whether the driver has been shutdown
pub(super) fn is_shutdown(&self) -> bool {
self.is_shutdown.load(Ordering::SeqCst)
}
}

impl fmt::Debug for Inner {
Expand Down
22 changes: 11 additions & 11 deletions tokio/src/runtime/time/tests/mod.rs
Expand Up @@ -67,7 +67,7 @@ fn single_timer() {
// This may or may not return Some (depending on how it races with the
// thread). If it does return None, however, the timer should complete
// synchronously.
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
handle.process_at_time(Some(handle.time_source().now() + 2_000_000_000));

jh.join().unwrap();
})
Expand Down Expand Up @@ -100,7 +100,7 @@ fn drop_timer() {
let handle = handle.inner.driver().time();

// advance 2s in the future.
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
handle.process_at_time(Some(handle.time_source().now() + 2_000_000_000));

jh.join().unwrap();
})
Expand Down Expand Up @@ -135,7 +135,7 @@ fn change_waker() {
let handle = handle.inner.driver().time();

// advance 2s
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
handle.process_at_time(Some(handle.time_source().now() + 2_000_000_000));

jh.join().unwrap();
})
Expand Down Expand Up @@ -177,19 +177,19 @@ fn reset_future() {
let handle = handle.inner.driver().time();

// This may or may not return a wakeup time.
handle.process_at_time(
handle.process_at_time(Some(
handle
.time_source()
.instant_to_tick(start + Duration::from_millis(1500)),
);
));

assert!(!finished_early.load(Ordering::Relaxed));

handle.process_at_time(
handle.process_at_time(Some(
handle
.time_source()
.instant_to_tick(start + Duration::from_millis(2500)),
);
));

jh.join().unwrap();

Expand Down Expand Up @@ -228,7 +228,7 @@ fn poll_process_levels() {
}

for t in 1..normal_or_miri(1024, 64) {
handle.inner.driver().time().process_at_time(t as u64);
handle.inner.driver().time().process_at_time(Some(t as u64));

for (deadline, future) in entries.iter_mut().enumerate() {
let mut context = Context::from_waker(noop_waker_ref());
Expand Down Expand Up @@ -257,8 +257,8 @@ fn poll_process_levels_targeted() {

let handle = handle.inner.driver().time();

handle.process_at_time(62);
handle.process_at_time(Some(62));
assert!(e1.as_mut().poll_elapsed(&mut context).is_pending());
handle.process_at_time(192);
handle.process_at_time(192);
handle.process_at_time(Some(192));
handle.process_at_time(Some(192));
}
2 changes: 1 addition & 1 deletion tokio/src/time/error.rs
Expand Up @@ -92,7 +92,7 @@ impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
use self::Kind::*;
let descr = match self.0 {
Shutdown => "the timer is shutdown, must be called from the context of Tokio runtime",
Shutdown => crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
AtCapacity => "timer is at capacity and cannot create a new entry",
Invalid => "timer duration exceeds maximum duration",
};
Expand Down
19 changes: 19 additions & 0 deletions tokio/tests/rt_handle_block_on.rs
Expand Up @@ -467,6 +467,25 @@ multi_threaded_rt_test! {
Handle::current().block_on(f);
}

#[test]
#[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
fn sleep_polled_before_shutdown_panics() {
use futures::future::poll_immediate;

let rt = rt();
let _enter = rt.enter();

let f = time::sleep(Duration::from_millis(100));
tokio::pin!(f);
Handle::current().block_on(async {
assert!(poll_immediate(f.as_mut()).await.is_none());
});

rt.shutdown_timeout(Duration::from_secs(1000));

Handle::current().block_on(f);
}

#[test]
#[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
fn sleep_after_shutdown_panics() {
Expand Down