diff --git a/benches/Cargo.toml b/benches/Cargo.toml index c581055cf65..0465cb00e1f 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -90,3 +90,8 @@ harness = false name = "time_now" path = "time_now.rs" harness = false + +[[bench]] +name = "time_timeout" +path = "time_timeout.rs" +harness = false \ No newline at end of file diff --git a/benches/time_timeout.rs b/benches/time_timeout.rs new file mode 100644 index 00000000000..66f3e6f6b9e --- /dev/null +++ b/benches/time_timeout.rs @@ -0,0 +1,62 @@ +//! Benchmark spawning a task onto the basic and threaded Tokio executors. +//! This essentially measure the time to enqueue a task in the local and remote +//! case. + +use std::time::{Duration, Instant}; + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use tokio::time::timeout; + +// a vevry quick async task, but might timeout +async fn quick_job() -> usize { + 1 +} + +fn single_thread_scheduler_timeout(c: &mut Criterion) { + do_test(c, 1, "single_thread_timeout"); +} + +fn multi_thread_scheduler_timeout(c: &mut Criterion) { + do_test(c, 8, "multi_thread_timeout-8"); +} + +fn do_test(c: &mut Criterion, workers: usize, name: &str) { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(workers) + .build() + .unwrap(); + + c.bench_function(name, |b| { + b.iter_custom(|iters| { + let start = Instant::now(); + runtime.block_on(async { + black_box(spawn_job(iters as usize, workers).await); + }); + start.elapsed() + }) + }); +} + +async fn spawn_job(iters: usize, procs: usize) { + let mut handles = Vec::with_capacity(procs); + for _ in 0..procs { + handles.push(tokio::spawn(async move { + for _ in 0..iters / procs { + let h = timeout(Duration::from_secs(1), quick_job()); + assert_eq!(black_box(h.await.unwrap()), 1); + } + })); + } + for handle in handles { + handle.await.unwrap(); + } +} + +criterion_group!( + timeout_benchmark, + single_thread_scheduler_timeout, + multi_thread_scheduler_timeout, +); + +criterion_main!(timeout_benchmark); diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 36f6e83c6b1..e7c1b1e5db5 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -256,6 +256,16 @@ impl Sleep { use crate::runtime::scheduler; let handle = scheduler::Handle::current(); + Self::new_timeout_with_handle(deadline, location, handle) + } + + #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] + #[track_caller] + pub(crate) fn new_timeout_with_handle( + deadline: Instant, + location: Option<&'static Location<'static>>, + handle: crate::runtime::scheduler::Handle, + ) -> Sleep { let entry = TimerEntry::new(&handle, deadline); #[cfg(all(tokio_unstable, feature = "tracing"))] diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index 52ab9891c69..3a1b6f90058 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -6,7 +6,7 @@ use crate::{ runtime::coop, - time::{error::Elapsed, sleep_until, Duration, Instant, Sleep}, + time::{error::Elapsed, Duration, Instant, Sleep}, util::trace, }; @@ -87,14 +87,7 @@ pub fn timeout(duration: Duration, future: F) -> Timeout where F: Future, { - let location = trace::caller_location(); - - let deadline = Instant::now().checked_add(duration); - let delay = match deadline { - Some(deadline) => Sleep::new_timeout(deadline, location), - None => Sleep::far_future(location), - }; - Timeout::new_with_delay(future, delay) + Timeout::new_with_delay(future, Instant::now().checked_add(duration)) } /// Requires a `Future` to complete before the specified instant in time. @@ -146,11 +139,14 @@ pub fn timeout_at(deadline: Instant, future: F) -> Timeout where F: Future, { - let delay = sleep_until(deadline); + use crate::runtime::scheduler; + let handle = scheduler::Handle::current(); Timeout { value: future, - delay, + deadline: Some(deadline), + delay: None, + handle, } } @@ -162,13 +158,23 @@ pin_project! { #[pin] value: T, #[pin] - delay: Sleep, + delay: Option, + deadline : Option, + handle: crate::runtime::scheduler::Handle, } } impl Timeout { - pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout { - Timeout { value, delay } + pub(crate) fn new_with_delay(value: T, deadline: Option) -> Timeout { + use crate::runtime::scheduler; + let handle = scheduler::Handle::current(); + + Timeout { + value, + deadline, + delay: None, + handle, + } } /// Gets a reference to the underlying value in this timeout. @@ -194,7 +200,7 @@ where type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let me = self.project(); + let mut me = self.project(); let had_budget_before = coop::has_budget_remaining(); @@ -205,10 +211,25 @@ where let has_budget_now = coop::has_budget_remaining(); + // If the above inner future is ready, the below code will not be executed. + // This lazy initiation is for performance purposes, + // it can avoid unnecessary of `Sleep` creation and drop. + if me.delay.is_none() { + let location = trace::caller_location(); + let delay = match me.deadline { + Some(deadline) => { + Sleep::new_timeout_with_handle(*deadline, location, me.handle.clone()) + } + None => Sleep::far_future(location), + }; + me.delay.as_mut().set(Some(delay)); + } + let delay = me.delay; let poll_delay = || -> Poll { - match delay.poll(cx) { + // Safety: we have just assigned it a value of `Some`. + match delay.as_pin_mut().unwrap().poll(cx) { Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())), Poll::Pending => Poll::Pending, }