From b158b05df583348c19029c17924b9c898a9e74ef Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Fri, 8 Mar 2024 11:15:22 +0100 Subject: [PATCH] test(subscriber): test with custom `self_wake()` function (#525) Part of the testing performed in the `console-subscriber` integration tests is detecting self wakes. This relied upon the `yield_now()` from Tokio. However, the behavior of this function was changed in tokio-rs/tokio#5223 and since Tokio 1.23 the wake doesn't occur in the task that `yield_now()` is called from. This breaks the test when using a newer version of Tokio. This change replaces the use of `yield_now()` with a custom `self_wake()` function that returns a future which does perform a self wake (wakes the task from within itself before returning `Poll::Pending`). The same custom `self_wake()` is also included in the `app` example so that it shows self wakes correctly. Tokio has been updated to 1.28.2 in the lock file (the last with compatible MSRV) so that this fix is tested. Ref #512 --- Cargo.lock | 14 ++++---- console-subscriber/examples/app.rs | 30 ++++++++++++++-- console-subscriber/tests/framework.rs | 2 +- console-subscriber/tests/support/mod.rs | 48 +++++++++++++++++++++++-- console-subscriber/tests/wake.rs | 4 +-- 5 files changed, 82 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37beaef76..7a379ceec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1731,24 +1731,22 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.21.0" +version = "1.28.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89797afd69d206ccd11fb0ea560a44bbb87731d020670e79416d442919257d42" +checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" dependencies = [ "autocfg", "bytes", "libc", - "memchr", "mio", "num_cpus", - "once_cell", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", "tracing", - "winapi", + "windows-sys 0.48.0", ] [[package]] @@ -1792,13 +1790,13 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "1.7.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 1.0.90", + "syn 2.0.33", ] [[package]] diff --git a/console-subscriber/examples/app.rs b/console-subscriber/examples/app.rs index 93c262c2a..7fdfe2a57 100644 --- a/console-subscriber/examples/app.rs +++ b/console-subscriber/examples/app.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{future::Future, task::Poll, time::Duration}; static HELP: &str = r#" Example console-instrumented app @@ -121,7 +121,7 @@ async fn burn(min: u64, max: u64) { loop { for i in min..max { for _ in 0..i { - tokio::task::yield_now().await; + self_wake().await; } tokio::time::sleep(Duration::from_secs(i - min)).await; } @@ -152,3 +152,29 @@ async fn spawn_blocking(seconds: u64) { .await; } } + +fn self_wake() -> impl Future { + struct SelfWake { + yielded: bool, + } + + impl Future for SelfWake { + type Output = (); + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll { + if self.yielded == true { + return Poll::Ready(()); + } + + self.yielded = true; + cx.waker().wake_by_ref(); + + Poll::Pending + } + } + + SelfWake { yielded: false } +} diff --git a/console-subscriber/tests/framework.rs b/console-subscriber/tests/framework.rs index 777d49c06..8b96cc141 100644 --- a/console-subscriber/tests/framework.rs +++ b/console-subscriber/tests/framework.rs @@ -63,7 +63,7 @@ fn self_wakes() { .match_default_name() .expect_self_wakes(1); - let future = async { task::yield_now().await }; + let future = async { support::self_wake().await }; assert_task(expected_task, future); } diff --git a/console-subscriber/tests/support/mod.rs b/console-subscriber/tests/support/mod.rs index 3a42583a2..460406a65 100644 --- a/console-subscriber/tests/support/mod.rs +++ b/console-subscriber/tests/support/mod.rs @@ -1,14 +1,14 @@ -use futures::Future; +use std::{future::Future, task::Poll}; + +use tokio::task::JoinHandle; mod state; mod subscriber; mod task; use subscriber::run_test; - pub(crate) use subscriber::MAIN_TASK_NAME; pub(crate) use task::ExpectedTask; -use tokio::task::JoinHandle; /// Assert that an `expected_task` is recorded by a console-subscriber /// when driving the provided `future` to completion. @@ -62,3 +62,45 @@ where .spawn(f) .expect(&format!("spawning task '{name}' failed")) } + +/// Wakes itself from within this task. +/// +/// This function returns a future which will wake itself and then +/// return `Poll::Pending` the first time it is called. The next time +/// it will return `Poll::Ready`. +/// +/// This is the old behavior of Tokio's [`yield_now()`] function, before it +/// was improved in [tokio-rs/tokio#5223] to avoid starving the resource +/// drivers. +/// +/// Awaiting the future returned from this function will result in a +/// self-wake being recorded. +/// +/// [`yield_now()`]: fn@tokio::task::yield_now +/// [tokio-rs/tokio#5223]: https://github.com/tokio-rs/tokio/pull/5223 +#[allow(dead_code)] +pub(crate) fn self_wake() -> impl Future { + struct SelfWake { + yielded: bool, + } + + impl Future for SelfWake { + type Output = (); + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll { + if self.yielded == true { + return Poll::Ready(()); + } + + self.yielded = true; + cx.waker().wake_by_ref(); + + Poll::Pending + } + } + + SelfWake { yielded: false } +} diff --git a/console-subscriber/tests/wake.rs b/console-subscriber/tests/wake.rs index e64e87a6e..0589f17bb 100644 --- a/console-subscriber/tests/wake.rs +++ b/console-subscriber/tests/wake.rs @@ -2,7 +2,7 @@ mod support; use std::time::Duration; use support::{assert_task, ExpectedTask}; -use tokio::{task, time::sleep}; +use tokio::time::sleep; #[test] fn sleep_wakes() { @@ -41,7 +41,7 @@ fn self_wake() { .expect_self_wakes(1); let future = async { - task::yield_now().await; + support::self_wake().await; }; assert_task(expected_task, future);