diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index f0d613a3bb4..dc4c96fa680 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -126,7 +126,7 @@ impl StateCell { fn when(&self) -> Option { let cur_state = self.state.load(Ordering::Relaxed); - if cur_state == u64::MAX { + if cur_state >= STATE_MIN_VALUE { None } else { Some(cur_state) @@ -563,10 +563,6 @@ impl TimerEntry { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if self.driver().is_shutdown() { - panic!("{}", crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR); - } - if let Some(deadline) = self.initial_deadline { self.as_mut().reset(deadline); } diff --git a/tokio/src/runtime/time/handle.rs b/tokio/src/runtime/time/handle.rs index fce791d998c..36515d67f00 100644 --- a/tokio/src/runtime/time/handle.rs +++ b/tokio/src/runtime/time/handle.rs @@ -13,11 +13,6 @@ impl Handle { &self.time_source } - /// Checks whether the driver has been shutdown. - pub(super) fn is_shutdown(&self) -> bool { - self.inner.is_shutdown() - } - /// Track that the driver is being unparked pub(crate) fn unpark(&self) { #[cfg(feature = "test-util")] diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 240f8f16e6d..9647a0d56d4 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -93,9 +93,6 @@ struct Inner { // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex pub(super) state: Mutex, - /// True if the driver is being shutdown. - pub(super) is_shutdown: AtomicBool, - // When `true`, a call to `park_timeout` should immediately return and time // should not advance. One reason for this to be `true` is if the task // passed to `Runtime::block_on` called `task::yield_now()`. @@ -116,6 +113,9 @@ struct InnerState { /// Timer wheel. wheel: wheel::Wheel, + + /// True if the driver is being shutdown. + is_shutdown: bool, } // ===== impl Driver ===== @@ -135,8 +135,8 @@ impl Driver { elapsed: 0, next_wake: None, wheel: wheel::Wheel::new(), + is_shutdown: false, }), - is_shutdown: AtomicBool::new(false), #[cfg(feature = "test-util")] did_wake: AtomicBool::new(false), @@ -159,15 +159,17 @@ impl Driver { pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { let handle = rt_handle.time(); - if handle.is_shutdown() { - return; - } + { + let mut lock = handle.inner.lock(); - handle.inner.is_shutdown.store(true, Ordering::SeqCst); + if lock.is_shutdown { + return; + } - // Advance time forward to the end of time. + lock.is_shutdown = true; + } - handle.process_at_time(u64::MAX); + handle.process_at_time(None); self.park.shutdown(rt_handle); } @@ -176,7 +178,7 @@ impl Driver { let handle = rt_handle.time(); let mut lock = handle.inner.state.lock(); - assert!(!handle.is_shutdown()); + assert!(!lock.is_shutdown); let next_wake = lock.wheel.next_expiration_time(); lock.next_wake = @@ -251,10 +253,16 @@ impl Handle { pub(self) fn process(&self) { let now = self.time_source().now(); - self.process_at_time(now) + self.process_at_time(Some(now)) } - pub(self) fn process_at_time(&self, mut now: u64) { + pub(self) fn process_at_time(&self, now: Option) { + let (mut now, state) = match now { + Some(now) => (now, Ok(())), + // Runtime being shutdown, advance time forward to the end of time. + None => (u64::MAX, Err(Error::shutdown())), + }; + let mut waker_list: [Option; 32] = Default::default(); let mut waker_idx = 0; @@ -274,7 +282,7 @@ impl Handle { debug_assert!(unsafe { entry.is_pending() }); // SAFETY: We hold the driver lock, and just removed the entry from any linked lists. - if let Some(waker) = unsafe { entry.fire(Ok(())) } { + if let Some(waker) = unsafe { entry.fire(state) } { waker_list[waker_idx] = Some(waker); waker_idx += 1; @@ -354,8 +362,11 @@ impl Handle { // Now that we have exclusive control of this entry, mint a handle to reinsert it. let entry = entry.as_ref().handle(); - if self.is_shutdown() { - unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } + if lock.is_shutdown { + unsafe { + entry.set_expiration(0); + entry.fire(Err(Error::shutdown())) + } } else { entry.set_expiration(new_tick); @@ -405,11 +416,6 @@ impl Inner { pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { self.state.lock() } - - // Check whether the driver has been shutdown - pub(super) fn is_shutdown(&self) -> bool { - self.is_shutdown.load(Ordering::SeqCst) - } } impl fmt::Debug for Inner { diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index 88c7d768d46..5896ee769e1 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -67,7 +67,7 @@ fn single_timer() { // This may or may not return Some (depending on how it races with the // thread). If it does return None, however, the timer should complete // synchronously. - handle.process_at_time(handle.time_source().now() + 2_000_000_000); + handle.process_at_time(Some(handle.time_source().now() + 2_000_000_000)); jh.join().unwrap(); }) @@ -100,7 +100,7 @@ fn drop_timer() { let handle = handle.inner.driver().time(); // advance 2s in the future. - handle.process_at_time(handle.time_source().now() + 2_000_000_000); + handle.process_at_time(Some(handle.time_source().now() + 2_000_000_000)); jh.join().unwrap(); }) @@ -135,7 +135,7 @@ fn change_waker() { let handle = handle.inner.driver().time(); // advance 2s - handle.process_at_time(handle.time_source().now() + 2_000_000_000); + handle.process_at_time(Some(handle.time_source().now() + 2_000_000_000)); jh.join().unwrap(); }) @@ -177,19 +177,19 @@ fn reset_future() { let handle = handle.inner.driver().time(); // This may or may not return a wakeup time. - handle.process_at_time( + handle.process_at_time(Some( handle .time_source() .instant_to_tick(start + Duration::from_millis(1500)), - ); + )); assert!(!finished_early.load(Ordering::Relaxed)); - handle.process_at_time( + handle.process_at_time(Some( handle .time_source() .instant_to_tick(start + Duration::from_millis(2500)), - ); + )); jh.join().unwrap(); @@ -228,7 +228,7 @@ fn poll_process_levels() { } for t in 1..normal_or_miri(1024, 64) { - handle.inner.driver().time().process_at_time(t as u64); + handle.inner.driver().time().process_at_time(Some(t as u64)); for (deadline, future) in entries.iter_mut().enumerate() { let mut context = Context::from_waker(noop_waker_ref()); @@ -257,8 +257,8 @@ fn poll_process_levels_targeted() { let handle = handle.inner.driver().time(); - handle.process_at_time(62); + handle.process_at_time(Some(62)); assert!(e1.as_mut().poll_elapsed(&mut context).is_pending()); - handle.process_at_time(192); - handle.process_at_time(192); + handle.process_at_time(Some(192)); + handle.process_at_time(Some(192)); } diff --git a/tokio/src/time/error.rs b/tokio/src/time/error.rs index 71344d43487..419b7520a59 100644 --- a/tokio/src/time/error.rs +++ b/tokio/src/time/error.rs @@ -92,7 +92,7 @@ impl fmt::Display for Error { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { use self::Kind::*; let descr = match self.0 { - Shutdown => "the timer is shutdown, must be called from the context of Tokio runtime", + Shutdown => crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, AtCapacity => "timer is at capacity and cannot create a new entry", Invalid => "timer duration exceeds maximum duration", }; diff --git a/tokio/tests/rt_handle_block_on.rs b/tokio/tests/rt_handle_block_on.rs index 5ec783e5588..4cf2adbcd9f 100644 --- a/tokio/tests/rt_handle_block_on.rs +++ b/tokio/tests/rt_handle_block_on.rs @@ -467,6 +467,25 @@ multi_threaded_rt_test! { Handle::current().block_on(f); } + #[test] + #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")] + fn sleep_polled_before_shutdown_panics() { + use futures::future::poll_immediate; + + let rt = rt(); + let _enter = rt.enter(); + + let f = time::sleep(Duration::from_millis(100)); + tokio::pin!(f); + Handle::current().block_on(async { + assert!(poll_immediate(f.as_mut()).await.is_none()); + }); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + Handle::current().block_on(f); + } + #[test] #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")] fn sleep_after_shutdown_panics() {