diff --git a/benches/Cargo.toml b/benches/Cargo.toml index c581055cf65..c1d13bac279 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -90,3 +90,8 @@ harness = false name = "time_now" path = "time_now.rs" harness = false + +[[bench]] +name = "time_timeout" +path = "time_timeout.rs" +harness = false diff --git a/benches/time_timeout.rs b/benches/time_timeout.rs new file mode 100644 index 00000000000..c961477562c --- /dev/null +++ b/benches/time_timeout.rs @@ -0,0 +1,109 @@ +use std::time::{Duration, Instant}; + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use tokio::{ + runtime::Runtime, + time::{sleep, timeout}, +}; + +// a very quick async task, but might timeout +async fn quick_job() -> usize { + 1 +} + +fn build_run_time(workers: usize) -> Runtime { + if workers == 1 { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + } else { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(workers) + .build() + .unwrap() + } +} + +fn single_thread_scheduler_timeout(c: &mut Criterion) { + do_timeout_test(c, 1, "single_thread_timeout"); +} + +fn multi_thread_scheduler_timeout(c: &mut Criterion) { + do_timeout_test(c, 8, "multi_thread_timeout-8"); +} + +fn do_timeout_test(c: &mut Criterion, workers: usize, name: &str) { + let runtime = build_run_time(workers); + c.bench_function(name, |b| { + b.iter_custom(|iters| { + let start = Instant::now(); + runtime.block_on(async { + black_box(spawn_timeout_job(iters as usize, workers).await); + }); + start.elapsed() + }) + }); +} + +async fn spawn_timeout_job(iters: usize, procs: usize) { + let mut handles = Vec::with_capacity(procs); + for _ in 0..procs { + handles.push(tokio::spawn(async move { + for _ in 0..iters / procs { + let h = timeout(Duration::from_secs(1), quick_job()); + assert_eq!(black_box(h.await.unwrap()), 1); + } + })); + } + for handle in handles { + handle.await.unwrap(); + } +} + +fn single_thread_scheduler_sleep(c: &mut Criterion) { + do_sleep_test(c, 1, "single_thread_sleep"); +} + +fn multi_thread_scheduler_sleep(c: &mut Criterion) { + do_sleep_test(c, 8, "multi_thread_sleep-8"); +} + +fn do_sleep_test(c: &mut Criterion, workers: usize, name: &str) { + let runtime = build_run_time(workers); + + c.bench_function(name, |b| { + b.iter_custom(|iters| { + let start = Instant::now(); + runtime.block_on(async { + black_box(spawn_sleep_job(iters as usize, workers).await); + }); + start.elapsed() + }) + }); +} + +async fn spawn_sleep_job(iters: usize, procs: usize) { + let mut handles = Vec::with_capacity(procs); + for _ in 0..procs { + handles.push(tokio::spawn(async move { + for _ in 0..iters / procs { + let _h = black_box(sleep(Duration::from_secs(1))); + } + })); + } + for handle in handles { + handle.await.unwrap(); + } +} + +criterion_group!( + timeout_benchmark, + single_thread_scheduler_timeout, + multi_thread_scheduler_timeout, + single_thread_scheduler_sleep, + multi_thread_scheduler_sleep +); + +criterion_main!(timeout_benchmark); diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index f6f56e277b2..a6be0e62a13 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -293,7 +293,7 @@ pub(crate) struct TimerEntry { /// /// This is manipulated only under the inner mutex. TODO: Can we use loom /// cells for this? - inner: StdUnsafeCell, + inner: StdUnsafeCell>, /// Deadline for the timer. This is used to register on the first /// poll, as we can't register prior to being pinned. deadline: Instant, @@ -469,23 +469,32 @@ unsafe impl linked_list::Link for TimerShared { impl TimerEntry { #[track_caller] - pub(crate) fn new(handle: &scheduler::Handle, deadline: Instant) -> Self { + pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self { // Panic if the time driver is not enabled let _ = handle.driver().time(); - let driver = handle.clone(); - Self { - driver, - inner: StdUnsafeCell::new(TimerShared::new()), + driver: handle, + inner: StdUnsafeCell::new(None), deadline, registered: false, _m: std::marker::PhantomPinned, } } + fn is_inner_init(&self) -> bool { + unsafe { &*self.inner.get() }.is_some() + } + + // This lazy initialization is for performance purposes. fn inner(&self) -> &TimerShared { - unsafe { &*self.inner.get() } + let inner = unsafe { &*self.inner.get() }; + if inner.is_none() { + unsafe { + *self.inner.get() = Some(TimerShared::new()); + } + } + return inner.as_ref().unwrap(); } pub(crate) fn deadline(&self) -> Instant { @@ -493,11 +502,15 @@ impl TimerEntry { } pub(crate) fn is_elapsed(&self) -> bool { - !self.inner().state.might_be_registered() && self.registered + self.is_inner_init() && !self.inner().state.might_be_registered() && self.registered } /// Cancels and deregisters the timer. This operation is irreversible. pub(crate) fn cancel(self: Pin<&mut Self>) { + // Avoid calling the `clear_entry` method, because it has not been initialized yet. + if !self.is_inner_init() { + return; + } // We need to perform an acq/rel fence with the driver thread, and the // simplest way to do so is to grab the driver lock. // @@ -524,8 +537,9 @@ impl TimerEntry { } pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) { - unsafe { self.as_mut().get_unchecked_mut() }.deadline = new_time; - unsafe { self.as_mut().get_unchecked_mut() }.registered = reregister; + let this = unsafe { self.as_mut().get_unchecked_mut() }; + this.deadline = new_time; + this.registered = reregister; let tick = self.driver().time_source().deadline_to_tick(new_time); diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index e7ab222ef63..520dc00a462 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -49,7 +49,7 @@ fn single_timer() { let handle_ = handle.clone(); let jh = thread::spawn(move || { let entry = TimerEntry::new( - &handle_.inner, + handle_.inner.clone(), handle_.inner.driver().clock().now() + Duration::from_secs(1), ); pin!(entry); @@ -83,7 +83,7 @@ fn drop_timer() { let handle_ = handle.clone(); let jh = thread::spawn(move || { let entry = TimerEntry::new( - &handle_.inner, + handle_.inner.clone(), handle_.inner.driver().clock().now() + Duration::from_secs(1), ); pin!(entry); @@ -117,7 +117,7 @@ fn change_waker() { let handle_ = handle.clone(); let jh = thread::spawn(move || { let entry = TimerEntry::new( - &handle_.inner, + handle_.inner.clone(), handle_.inner.driver().clock().now() + Duration::from_secs(1), ); pin!(entry); @@ -157,7 +157,7 @@ fn reset_future() { let start = handle.inner.driver().clock().now(); let jh = thread::spawn(move || { - let entry = TimerEntry::new(&handle_.inner, start + Duration::from_secs(1)); + let entry = TimerEntry::new(handle_.inner.clone(), start + Duration::from_secs(1)); pin!(entry); let _ = entry @@ -219,7 +219,7 @@ fn poll_process_levels() { for i in 0..normal_or_miri(1024, 64) { let mut entry = Box::pin(TimerEntry::new( - &handle.inner, + handle.inner.clone(), handle.inner.driver().clock().now() + Duration::from_millis(i), )); @@ -253,7 +253,7 @@ fn poll_process_levels_targeted() { let handle = rt.handle(); let e1 = TimerEntry::new( - &handle.inner, + handle.inner.clone(), handle.inner.driver().clock().now() + Duration::from_millis(193), ); pin!(e1); diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 36f6e83c6b1..9223396fe54 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -254,12 +254,11 @@ impl Sleep { location: Option<&'static Location<'static>>, ) -> Sleep { use crate::runtime::scheduler; - let handle = scheduler::Handle::current(); - let entry = TimerEntry::new(&handle, deadline); - + let entry = TimerEntry::new(handle, deadline); #[cfg(all(tokio_unstable, feature = "tracing"))] let inner = { + let handle = scheduler::Handle::current(); let clock = handle.driver().clock(); let handle = &handle.driver().time(); let time_source = handle.time_source();