Skip to content

Commit

Permalink
This commit is part of reducing timeout performance overhead.
Browse files Browse the repository at this point in the history
    See tokio-rs#6504

    Below are relevant benchmark results of this PR on m1 mac:
    single_thread_timeout   time:   [21.869 ns 21.987 ns 22.135 ns]
                            change: [-3.4429% -2.0709% -0.8759%] (p = 0.00 < 0.05)
                            Change within noise threshold.
    Found 7 outliers among 100 measurements (7.00%)
      3 (3.00%) high mild
      4 (4.00%) high severe

    multi_thread_timeout-8  time:   [4.4835 ns 4.6138 ns 4.7614 ns]
                            change: [-4.3554% +0.1643% +4.5114%] (p = 0.95 > 0.05)
                            No change in performance detected.
    Found 9 outliers among 100 measurements (9.00%)
      8 (8.00%) high mild
      1 (1.00%) high severe

    Below are relevant benchmark results of current version on m1 mac:

    single_thread_timeout   time:   [40.227 ns 40.416 ns 40.691 ns]
                            change: [+81.321% +82.817% +84.121%] (p = 0.00 < 0.05)
                            Performance has regressed.
    Found 14 outliers among 100 measurements (14.00%)
      3 (3.00%) high mild
      11 (11.00%) high severe

    multi_thread_timeout-8  time:   [183.16 ns 186.02 ns 188.21 ns]
                            change: [+3765.0% +3880.4% +3987.4%] (p = 0.00 < 0.05)
                            Performance has regressed.
    Found 10 outliers among 100 measurements (10.00%)
      4 (4.00%) low severe
      6 (6.00%) low mild
  • Loading branch information
wathenjiang committed Apr 23, 2024
1 parent a73d6bf commit bde0742
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 16 deletions.
5 changes: 5 additions & 0 deletions benches/Cargo.toml
Expand Up @@ -90,3 +90,8 @@ harness = false
name = "time_now"
path = "time_now.rs"
harness = false

[[bench]]
name = "time_timeout"
path = "time_timeout.rs"
harness = false
62 changes: 62 additions & 0 deletions benches/time_timeout.rs
@@ -0,0 +1,62 @@
//! Benchmark spawning a task onto the basic and threaded Tokio executors.
//! This essentially measure the time to enqueue a task in the local and remote
//! case.

use std::time::{Duration, Instant};

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use tokio::time::timeout;

// a vevry quick async task, but might timeout
async fn quick_job() -> usize {
1
}

fn single_thread_scheduler_timeout(c: &mut Criterion) {
do_test(c, 1, "single_thread_timeout");
}

fn multi_thread_scheduler_timeout(c: &mut Criterion) {
do_test(c, 8, "multi_thread_timeout-8");
}

fn do_test(c: &mut Criterion, workers: usize, name: &str) {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(workers)
.build()
.unwrap();

c.bench_function(name, |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(spawn_job(iters as usize, workers).await);
});
start.elapsed()
})
});
}

async fn spawn_job(iters: usize, procs: usize) {
let mut handles = Vec::with_capacity(procs);
for _ in 0..procs {
handles.push(tokio::spawn(async move {
for _ in 0..iters / procs {
let h = timeout(Duration::from_secs(1), quick_job());
assert_eq!(black_box(h.await.unwrap()), 1);
}
}));
}
for handle in handles {
handle.await.unwrap();
}
}

criterion_group!(
timeout_benchmark,
single_thread_scheduler_timeout,
multi_thread_scheduler_timeout,
);

criterion_main!(timeout_benchmark);
10 changes: 10 additions & 0 deletions tokio/src/time/sleep.rs
Expand Up @@ -256,6 +256,16 @@ impl Sleep {
use crate::runtime::scheduler;

let handle = scheduler::Handle::current();
Self::new_timeout_with_handle(deadline, location, handle)
}

#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
#[track_caller]
pub(crate) fn new_timeout_with_handle(
deadline: Instant,
location: Option<&'static Location<'static>>,
handle: crate::runtime::scheduler::Handle,
) -> Sleep {
let entry = TimerEntry::new(&handle, deadline);

#[cfg(all(tokio_unstable, feature = "tracing"))]
Expand Down
53 changes: 37 additions & 16 deletions tokio/src/time/timeout.rs
Expand Up @@ -6,7 +6,7 @@

use crate::{
runtime::coop,
time::{error::Elapsed, sleep_until, Duration, Instant, Sleep},
time::{error::Elapsed, Duration, Instant, Sleep},
util::trace,
};

Expand Down Expand Up @@ -87,14 +87,7 @@ pub fn timeout<F>(duration: Duration, future: F) -> Timeout<F>
where
F: Future,
{
let location = trace::caller_location();

let deadline = Instant::now().checked_add(duration);
let delay = match deadline {
Some(deadline) => Sleep::new_timeout(deadline, location),
None => Sleep::far_future(location),
};
Timeout::new_with_delay(future, delay)
Timeout::new_with_delay(future, Instant::now().checked_add(duration))
}

/// Requires a `Future` to complete before the specified instant in time.
Expand Down Expand Up @@ -146,11 +139,14 @@ pub fn timeout_at<F>(deadline: Instant, future: F) -> Timeout<F>
where
F: Future,
{
let delay = sleep_until(deadline);
use crate::runtime::scheduler;
let handle = scheduler::Handle::current();

Timeout {
value: future,
delay,
deadline: Some(deadline),
delay: None,
handle,
}
}

Expand All @@ -162,13 +158,23 @@ pin_project! {
#[pin]
value: T,
#[pin]
delay: Sleep,
delay: Option<Sleep>,
deadline : Option<Instant>,
handle: crate::runtime::scheduler::Handle,
}
}

impl<T> Timeout<T> {
pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout<T> {
Timeout { value, delay }
pub(crate) fn new_with_delay(value: T, deadline: Option<Instant>) -> Timeout<T> {
use crate::runtime::scheduler;
let handle = scheduler::Handle::current();

Timeout {
value,
deadline,
delay: None,
handle,
}
}

/// Gets a reference to the underlying value in this timeout.
Expand All @@ -194,7 +200,7 @@ where
type Output = Result<T::Output, Elapsed>;

fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let me = self.project();
let mut me = self.project();

let had_budget_before = coop::has_budget_remaining();

Expand All @@ -205,10 +211,25 @@ where

let has_budget_now = coop::has_budget_remaining();

// If the above inner future is ready, the below code will not be executed.
// This lazy initiation is for performance purposes,
// it can avoid unnecessary of `Sleep` creation and drop.
if me.delay.is_none() {
let location = trace::caller_location();
let delay = match me.deadline {
Some(deadline) => {
Sleep::new_timeout_with_handle(*deadline, location, me.handle.clone())
}
None => Sleep::far_future(location),
};
me.delay.as_mut().set(Some(delay));
}

let delay = me.delay;

let poll_delay = || -> Poll<Self::Output> {
match delay.poll(cx) {
// Safety: we have just assigned it a value of `Some`.
match delay.as_pin_mut().unwrap().poll(cx) {
Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())),
Poll::Pending => Poll::Pending,
}
Expand Down

0 comments on commit bde0742

Please sign in to comment.