Skip to content

Commit

Permalink
metrics: fix flaky injection_queue_depth test (#6559)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed May 14, 2024
1 parent 227979f commit d085260
Showing 1 changed file with 18 additions and 28 deletions.
46 changes: 18 additions & 28 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#![cfg(all(feature = "full", tokio_unstable, not(target_os = "wasi")))]

use std::future::Future;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Barrier, Mutex};
use std::task::Poll;
use tokio::macros::support::poll_fn;

Expand Down Expand Up @@ -504,7 +504,7 @@ fn worker_overflow_count() {
}

#[test]
fn injection_queue_depth() {
fn injection_queue_depth_current_thread() {
use std::thread;

let rt = current_thread();
Expand All @@ -518,44 +518,34 @@ fn injection_queue_depth() {
.unwrap();

assert_eq!(1, metrics.injection_queue_depth());
}

#[test]
fn injection_queue_depth_multi_thread() {
let rt = threaded();
let handle = rt.handle().clone();
let metrics = rt.metrics();

// First we need to block the runtime workers
let (tx1, rx1) = std::sync::mpsc::channel();
let (tx2, rx2) = std::sync::mpsc::channel();
let (tx3, rx3) = std::sync::mpsc::channel();
let rx3 = Arc::new(Mutex::new(rx3));
let barrier1 = Arc::new(Barrier::new(3));
let barrier2 = Arc::new(Barrier::new(3));

rt.spawn(async move { rx1.recv().unwrap() });
rt.spawn(async move { rx2.recv().unwrap() });

// Spawn some more to make sure there are items
for _ in 0..10 {
let rx = rx3.clone();
// Spawn a task per runtime worker to block it.
for _ in 0..2 {
let barrier1 = barrier1.clone();
let barrier2 = barrier2.clone();
rt.spawn(async move {
rx.lock().unwrap().recv().unwrap();
barrier1.wait();
barrier2.wait();
});
}

thread::spawn(move || {
handle.spawn(async {});
})
.join()
.unwrap();
barrier1.wait();

let n = metrics.injection_queue_depth();
assert!(1 <= n, "{}", n);
assert!(15 >= n, "{}", n);

for _ in 0..10 {
tx3.send(()).unwrap();
for i in 0..10 {
assert_eq!(i, metrics.injection_queue_depth());
rt.spawn(async {});
}

tx1.send(()).unwrap();
tx2.send(()).unwrap();
barrier2.wait();
}

#[test]
Expand Down

0 comments on commit d085260

Please sign in to comment.