Skip to content

Commit

Permalink
Add a test to ensure tasks are balanced across workers
Browse files Browse the repository at this point in the history
  • Loading branch information
AzureMarker committed Dec 21, 2021
1 parent 16e1521 commit 44ab28e
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 12 deletions.
2 changes: 1 addition & 1 deletion tokio-util/src/task/spawn_pinned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl LocalPool {
// a join handle.
worker.runtime_handle.spawn(async move {
// Move the job guard into the task
let _ = job_guard;
let _job_guard = job_guard;

// Inside the future we can't run spawn_local yet because we're not
// in the context of a LocalSet. We need to send create_task to the
Expand Down
62 changes: 51 additions & 11 deletions tokio-util/tests/spawn_pinned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio_util::task;

// Simple test of running a !Send future via spawn_pinned
/// Simple test of running a !Send future via spawn_pinned
#[tokio::test]
async fn can_spawn_not_send_future() {
let pool = task::LocalPoolHandle::new(1);
Expand All @@ -24,7 +24,7 @@ async fn can_spawn_not_send_future() {
assert_eq!(output, "test");
}

// Dropping the join handle still lets the task execute
/// Dropping the join handle still lets the task execute
#[test]
fn can_drop_future_and_still_get_output() {
let pool = task::LocalPoolHandle::new(1);
Expand All @@ -49,7 +49,7 @@ fn cannot_create_zero_sized_pool() {
let _pool = task::LocalPoolHandle::new(0);
}

// We should be able to spawn multiple futures onto the pool at the same time.
/// We should be able to spawn multiple futures onto the pool at the same time.
#[tokio::test]
async fn can_spawn_multiple_futures() {
let pool = task::LocalPoolHandle::new(2);
Expand All @@ -67,8 +67,8 @@ async fn can_spawn_multiple_futures() {
assert_eq!(join_handle2.await.unwrap(), "test2");
}

// A panic in the spawned task causes the join handle to return an error.
// But, you can continue to spawn tasks.
/// A panic in the spawned task causes the join handle to return an error.
/// But, you can continue to spawn tasks.
#[tokio::test]
async fn task_panic_propagates() {
let pool = task::LocalPoolHandle::new(1);
Expand All @@ -89,8 +89,8 @@ async fn task_panic_propagates() {
assert_eq!(result.unwrap(), "test");
}

// A panic during task creation causes the join handle to return an error.
// But, you can continue to spawn tasks.
/// A panic during task creation causes the join handle to return an error.
/// But, you can continue to spawn tasks.
#[tokio::test]
async fn callback_panic_does_not_kill_worker() {
let pool = task::LocalPoolHandle::new(1);
Expand All @@ -113,10 +113,10 @@ async fn callback_panic_does_not_kill_worker() {
assert_eq!(result.unwrap(), "test");
}

// Canceling the task via the returned join handle cancels the spawned task
// (which has a different, internal join handle).
// This test is loosely based off of `test_abort_wakes_task_3964` in
// `tokio/tests/test_abort.rs`
/// Canceling the task via the returned join handle cancels the spawned task
/// (which has a different, internal join handle).
/// This test is loosely based off of `test_abort_wakes_task_3964` in
/// `tokio/tests/test_abort.rs`
#[tokio::test]
async fn task_cancellation_propagates() {
let pool = task::LocalPoolHandle::new(1);
Expand Down Expand Up @@ -144,3 +144,43 @@ async fn task_cancellation_propagates() {
// was canceled as well.
assert!(weak_notify_dropped.upgrade().is_none());
}

/// Tasks should be given to the least burdened worker. When spawning two tasks
/// on a pool with two empty workers the tasks should be spawned on separate
/// workers.
#[tokio::test]
async fn tasks_are_balanced() {
let pool = task::LocalPoolHandle::new(2);

// Spawn a task so one thread has a task count of 1
let (start_sender1, start_receiver1) = tokio::sync::oneshot::channel();
let (end_sender1, end_receiver1) = tokio::sync::oneshot::channel();
let join_handle1 = pool.spawn_pinned(|| async move {
let _ = start_sender1.send(());
let _ = end_receiver1.await;
std::thread::current().id()
});

// Wait for the first task to start up
let _ = start_receiver1.await;

// This task should be spawned on the other thread
let (start_sender2, start_receiver2) = tokio::sync::oneshot::channel();
let join_handle2 = pool.spawn_pinned(|| async move {
let _ = start_sender2.send(());
std::thread::current().id()
});

// Wait for the second task to start up
let _ = start_receiver2.await;

// Allow the first task to end
let _ = end_sender1.send(());

let thread_id1 = join_handle1.await.unwrap();
let thread_id2 = join_handle2.await.unwrap();

println!("{:?} {:?}", thread_id1, thread_id2);

assert_ne!(thread_id1, thread_id2);
}

0 comments on commit 44ab28e

Please sign in to comment.