diff --git a/benches/mpsc.rs b/benches/mpsc.rs index 49bd3cc0434..ec07ad8f8bf 100644 --- a/benches/mpsc.rs +++ b/benches/mpsc.rs @@ -43,7 +43,7 @@ fn send_large(b: &mut Bencher) { } fn contention_bounded(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -70,7 +70,7 @@ fn contention_bounded(b: &mut Bencher) { } fn contention_bounded_full(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -97,7 +97,7 @@ fn contention_bounded_full(b: &mut Bencher) { } fn contention_unbounded(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -124,7 +124,7 @@ fn contention_unbounded(b: &mut Bencher) { } fn uncontented_bounded(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -146,7 +146,7 @@ fn uncontented_bounded(b: &mut Bencher) { } fn uncontented_unbounded(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() diff --git a/benches/scheduler.rs b/benches/scheduler.rs index 0562a1201a7..801de72a553 100644 --- a/benches/scheduler.rs +++ b/benches/scheduler.rs @@ -13,7 +13,7 @@ use std::sync::{mpsc, Arc}; fn spawn_many(b: &mut Bencher) { const NUM_SPAWN: usize = 10_000; - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::sync_channel(1000); let rem = Arc::new(AtomicUsize::new(0)); @@ -68,7 +68,7 @@ fn yield_many(b: &mut Bencher) { fn ping_pong(b: &mut Bencher) { const NUM_PINGS: usize = 1_000; - let mut rt = rt(); + let rt = rt(); let (done_tx, done_rx) = mpsc::sync_channel(1000); let rem = Arc::new(AtomicUsize::new(0)); @@ -111,7 +111,7 @@ fn ping_pong(b: &mut Bencher) { fn chained_spawn(b: &mut Bencher) { const ITER: usize = 1_000; - let mut rt = rt(); + let rt = rt(); fn iter(done_tx: mpsc::SyncSender<()>, n: usize) { if n == 0 { diff --git a/benches/spawn.rs b/benches/spawn.rs index 9122c7b1534..122c11c408b 100644 --- a/benches/spawn.rs +++ b/benches/spawn.rs @@ -10,7 +10,7 @@ async fn work() -> usize { } fn basic_scheduler_local_spawn(bench: &mut Bencher) { - let mut runtime = tokio::runtime::Builder::new() + let runtime = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); @@ -23,7 +23,7 @@ fn basic_scheduler_local_spawn(bench: &mut Bencher) { } fn threaded_scheduler_local_spawn(bench: &mut Bencher) { - let mut runtime = tokio::runtime::Builder::new() + let runtime = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); diff --git a/benches/sync_rwlock.rs b/benches/sync_rwlock.rs index 4eca9807b2e..30c66e49394 100644 --- a/benches/sync_rwlock.rs +++ b/benches/sync_rwlock.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use tokio::{sync::RwLock, task}; fn read_uncontended(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -22,7 +22,7 @@ fn read_uncontended(b: &mut Bencher) { } fn read_concurrent_uncontended_multi(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -51,7 +51,7 @@ fn read_concurrent_uncontended_multi(b: &mut Bencher) { } fn read_concurrent_uncontended(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); @@ -78,7 +78,7 @@ fn read_concurrent_uncontended(b: &mut Bencher) { } fn read_concurrent_contended_multi(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -108,7 +108,7 @@ fn read_concurrent_contended_multi(b: &mut Bencher) { } fn read_concurrent_contended(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); diff --git a/benches/sync_semaphore.rs b/benches/sync_semaphore.rs index c43311c0d35..32d4aa2b50e 100644 --- a/benches/sync_semaphore.rs +++ b/benches/sync_semaphore.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use tokio::{sync::Semaphore, task}; fn uncontended(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -27,7 +27,7 @@ async fn task(s: Arc) { } fn uncontended_concurrent_multi(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -51,7 +51,7 @@ fn uncontended_concurrent_multi(b: &mut Bencher) { } fn uncontended_concurrent_single(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); @@ -73,7 +73,7 @@ fn uncontended_concurrent_single(b: &mut Bencher) { } fn contended_concurrent_multi(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -97,7 +97,7 @@ fn contended_concurrent_multi(b: &mut Bencher) { } fn contended_concurrent_single(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); diff --git a/tests-integration/tests/rt_shell.rs b/tests-integration/tests/rt_shell.rs index 392c05196d0..012f44a71b3 100644 --- a/tests-integration/tests/rt_shell.rs +++ b/tests-integration/tests/rt_shell.rs @@ -18,7 +18,7 @@ fn basic_shell_rt() { }); for _ in 0..1_000 { - let mut rt = runtime::Builder::new().build().unwrap(); + let rt = runtime::Builder::new().build().unwrap(); let (tx, rx) = oneshot::channel(); diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index 35f2a9b02b2..d8c74671ac3 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -28,7 +28,7 @@ pub mod task; pub fn block_on(future: F) -> F::Output { use tokio::runtime; - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .basic_scheduler() .enable_all() .build() diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 0b36a75f655..bda4884d8d0 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -12,24 +12,25 @@ cfg_blocking_impl! { pub(crate) mod task; use crate::runtime::Builder; + use crate::loom::sync::Arc; - pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { - BlockingPool::new(builder, thread_cap) - + pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> Arc { + Arc::new(BlockingPool::new(builder, thread_cap)) } } cfg_not_blocking_impl! { use crate::runtime::Builder; use std::time::Duration; + use crate::loom::sync::Arc; #[derive(Debug, Clone)] pub(crate) struct BlockingPool {} pub(crate) use BlockingPool as Spawner; - pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool { - BlockingPool {} + pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> Arc { + Arc::new(BlockingPool {}) } impl BlockingPool { diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index fad72c7ad94..96b921e6275 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,10 +1,9 @@ +use crate::loom::sync::{Arc, Mutex}; use crate::runtime::handle::Handle; use crate::runtime::shell::Shell; use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner}; use std::fmt; -#[cfg(not(loom))] -use std::sync::Arc; /// Builds Tokio Runtime with custom configuration values. /// @@ -263,7 +262,7 @@ impl Builder { where F: Fn() + Send + Sync + 'static, { - self.after_start = Some(Arc::new(f)); + self.after_start = Some(std::sync::Arc::new(f)); self } @@ -303,7 +302,7 @@ impl Builder { /// ``` /// use tokio::runtime::Builder; /// - /// let mut rt = Builder::new().build().unwrap(); + /// let rt = Builder::new().build().unwrap(); /// /// rt.block_on(async { /// println!("Hello from the Tokio runtime"); @@ -334,7 +333,7 @@ impl Builder { let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { - kind: Kind::Shell(Shell::new(driver)), + kind: Kind::Shell(Arc::new(Mutex::new(Shell::new(driver)))), handle: Handle { spawner, io_handle, @@ -433,7 +432,7 @@ cfg_rt_core! { let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { - kind: Kind::Basic(scheduler), + kind: Kind::Basic(Arc::new(Mutex::new(scheduler))), handle: Handle { spawner, io_handle, @@ -493,7 +492,7 @@ cfg_rt_threaded! { handle.enter(|| launch.launch()); Ok(Runtime { - kind: Kind::ThreadPool(scheduler), + kind: Kind::ThreadPool(Arc::new(scheduler)), handle, blocking_pool, }) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 0716a7fadca..f5025d04ffe 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -244,7 +244,7 @@ cfg_rt_core! { /// use std::thread; /// /// // Create the runtime. - /// let mut rt = Builder::new() + /// let rt = Builder::new() /// .enable_all() /// .basic_scheduler() /// .build() diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 637f38cabb5..0d7190e300a 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -69,7 +69,7 @@ //! //! fn main() -> Result<(), Box> { //! // Create the runtime -//! let mut rt = Runtime::new()?; +//! let rt = Runtime::new()?; //! //! // Spawn the root task //! rt.block_on(async { @@ -240,6 +240,7 @@ cfg_rt_core! { use crate::task::JoinHandle; } +use crate::loom::sync::{Arc, Mutex}; use std::future::Future; use std::time::Duration; @@ -271,7 +272,7 @@ use std::time::Duration; /// [`new`]: method@Self::new /// [`Builder`]: struct@Builder /// [`tokio::run`]: fn@run -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Runtime { /// Task executor kind: Kind, @@ -280,23 +281,23 @@ pub struct Runtime { handle: Handle, /// Blocking pool handle, used to signal shutdown - blocking_pool: BlockingPool, + blocking_pool: Arc, } /// The runtime executor is either a thread-pool or a current-thread executor. -#[derive(Debug)] +#[derive(Debug, Clone)] enum Kind { /// Not able to execute concurrent tasks. This variant is mostly used to get /// access to the driver handles. - Shell(Shell), + Shell(Arc>), /// Execute all tasks on the current-thread. #[cfg(feature = "rt-core")] - Basic(BasicScheduler), + Basic(Arc>>), /// Execute tasks across multiple threads. #[cfg(feature = "rt-threaded")] - ThreadPool(ThreadPool), + ThreadPool(Arc), } /// After thread starts / before thread stops @@ -397,7 +398,10 @@ impl Runtime { Kind::Shell(_) => panic!("task execution disabled"), #[cfg(feature = "rt-threaded")] Kind::ThreadPool(exec) => exec.spawn(future), - Kind::Basic(exec) => exec.spawn(future), + Kind::Basic(exec) => { + let exec = exec.lock().unwrap(); + exec.spawn(future) + } } } @@ -426,7 +430,7 @@ impl Runtime { /// use tokio::runtime::Runtime; /// /// // Create the runtime - /// let mut rt = Runtime::new().unwrap(); + /// let rt = Runtime::new().unwrap(); /// /// // Execute the future, blocking the current thread until completion /// rt.block_on(async { @@ -435,13 +439,17 @@ impl Runtime { /// ``` /// /// [handle]: fn@Handle::block_on - pub fn block_on(&mut self, future: F) -> F::Output { - let kind = &mut self.kind; - - self.handle.enter(|| match kind { - Kind::Shell(exec) => exec.block_on(future), + pub fn block_on(&self, future: F) -> F::Output { + self.handle.enter(|| match &self.kind { + Kind::Shell(exec) => { + let mut exec = exec.lock().unwrap(); + exec.block_on(future) + } #[cfg(feature = "rt-core")] - Kind::Basic(exec) => exec.block_on(future), + Kind::Basic(exec) => { + let mut exec = exec.lock().unwrap(); + exec.block_on(future) + } #[cfg(feature = "rt-threaded")] Kind::ThreadPool(exec) => exec.block_on(future), }) @@ -531,7 +539,7 @@ impl Runtime { /// use std::time::Duration; /// /// fn main() { - /// let mut runtime = Runtime::new().unwrap(); + /// let runtime = Runtime::new().unwrap(); /// /// runtime.block_on(async move { /// task::spawn_blocking(move || { @@ -545,7 +553,11 @@ impl Runtime { pub fn shutdown_timeout(mut self, duration: Duration) { // Wakeup and shutdown all the worker threads self.handle.spawner.shutdown(); - self.blocking_pool.shutdown(Some(duration)); + + // TODO: this is likely incorrect, we need to find some way to synchronize shutting down. + if let Some(blocking_pool) = Arc::get_mut(&mut self.blocking_pool) { + blocking_pool.shutdown(Some(duration)); + } } /// Shutdown the runtime, without waiting for any spawned tasks to shutdown. @@ -565,7 +577,7 @@ impl Runtime { /// use tokio::runtime::Runtime; /// /// fn main() { - /// let mut runtime = Runtime::new().unwrap(); + /// let runtime = Runtime::new().unwrap(); /// /// runtime.block_on(async move { /// let inner_runtime = Runtime::new().unwrap(); diff --git a/tokio/src/runtime/tests/loom_pool.rs b/tokio/src/runtime/tests/loom_pool.rs index c08658cde87..e8ea731a7ed 100644 --- a/tokio/src/runtime/tests/loom_pool.rs +++ b/tokio/src/runtime/tests/loom_pool.rs @@ -178,7 +178,7 @@ mod group_b { #[test] fn join_output() { loom::model(|| { - let mut rt = mk_pool(1); + let rt = mk_pool(1); rt.block_on(async { let t = crate::spawn(track(async { "hello" })); @@ -192,7 +192,7 @@ mod group_b { #[test] fn poll_drop_handle_then_drop() { loom::model(|| { - let mut rt = mk_pool(1); + let rt = mk_pool(1); rt.block_on(async move { let mut t = crate::spawn(track(async { "hello" })); diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index 50edd2b6c48..9e55a5e60e0 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -185,7 +185,7 @@ mod tests { #[test] fn smoke() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { let registry = Registry::new(vec![ EventInfo::default(), @@ -247,7 +247,7 @@ mod tests { #[test] fn broadcast_cleans_up_disconnected_listeners() { - let mut rt = Runtime::new().unwrap(); + let rt = Runtime::new().unwrap(); rt.block_on(async { let registry = Registry::new(vec![EventInfo::default()]); diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs index f55e504b00f..b3f495b6111 100644 --- a/tokio/src/signal/windows.rs +++ b/tokio/src/signal/windows.rs @@ -272,7 +272,7 @@ mod tests { #[test] fn ctrl_break() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let mut ctrl_break = assert_ok!(super::ctrl_break()); diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 3c409edfb90..a56453df401 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -312,9 +312,9 @@ impl LocalSet { /// use tokio::runtime::Runtime; /// use tokio::task; /// - /// let mut rt = Runtime::new().unwrap(); + /// let rt = Runtime::new().unwrap(); /// let local = task::LocalSet::new(); - /// local.block_on(&mut rt, async { + /// local.block_on(&rt, async { /// let join = task::spawn_local(async { /// let blocking_result = task::block_in_place(|| { /// // ... @@ -329,9 +329,9 @@ impl LocalSet { /// use tokio::runtime::Runtime; /// use tokio::task; /// - /// let mut rt = Runtime::new().unwrap(); + /// let rt = Runtime::new().unwrap(); /// let local = task::LocalSet::new(); - /// local.block_on(&mut rt, async { + /// local.block_on(&rt, async { /// let join = task::spawn_local(async { /// let blocking_result = task::spawn_blocking(|| { /// // ... @@ -346,7 +346,7 @@ impl LocalSet { /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on /// [in-place blocking]: fn@crate::task::block_in_place /// [`spawn_blocking`]: fn@crate::task::spawn_blocking - pub fn block_on(&self, rt: &mut crate::runtime::Runtime, future: F) -> F::Output + pub fn block_on(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output where F: Future, { diff --git a/tokio/tests/io_driver.rs b/tokio/tests/io_driver.rs index b85abd8c2ac..d4f4f8d48cf 100644 --- a/tokio/tests/io_driver.rs +++ b/tokio/tests/io_driver.rs @@ -45,7 +45,7 @@ fn test_drop_on_notify() { // shutting down. Then, when the task handle is dropped, the task itself is // dropped. - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .basic_scheduler() .enable_all() .build() diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 0885992d7d2..3813c48006c 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -12,7 +12,7 @@ use std::time::Duration; fn spawned_task_does_not_progress_without_block_on() { let (tx, mut rx) = oneshot::channel(); - let mut rt = rt(); + let rt = rt(); rt.spawn(async move { assert_ok!(tx.send("hello")); @@ -65,7 +65,7 @@ fn no_extra_poll() { }; let npolls = Arc::clone(&rx.npolls); - let mut rt = rt(); + let rt = rt(); rt.spawn(async move { while rx.next().await.is_some() {} }); rt.block_on(async { @@ -100,7 +100,7 @@ fn acquire_mutex_in_drop() { let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); - let mut rt = rt(); + let rt = rt(); rt.spawn(async move { let _ = rx2.await; diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 4211a66705e..745ef3c53b3 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -72,7 +72,7 @@ rt_test! { #[test] fn block_on_sync() { - let mut rt = rt(); + let rt = rt(); let mut win = false; rt.block_on(async { @@ -96,7 +96,7 @@ rt_test! { #[test] fn block_on_async() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -132,7 +132,7 @@ rt_test! { #[test] fn spawn_one_bg() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -149,7 +149,7 @@ rt_test! { #[test] fn spawn_one_join() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -172,7 +172,7 @@ rt_test! { #[test] fn spawn_two() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx1, rx1) = oneshot::channel(); @@ -199,7 +199,7 @@ rt_test! { const ITER: usize = 200; - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (done_tx, mut done_rx) = mpsc::unbounded_channel(); @@ -249,7 +249,7 @@ rt_test! { const ITER: usize = 500; - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { tokio::spawn(async move { @@ -305,7 +305,7 @@ rt_test! { #[test] fn spawn_await_chain() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { assert_ok!(tokio::spawn(async { @@ -320,7 +320,7 @@ rt_test! { #[test] fn outstanding_tasks_dropped() { - let mut rt = rt(); + let rt = rt(); let cnt = Arc::new(()); @@ -343,16 +343,16 @@ rt_test! { #[test] #[should_panic] fn nested_rt() { - let mut rt1 = rt(); - let mut rt2 = rt(); + let rt1 = rt(); + let rt2 = rt(); rt1.block_on(async { rt2.block_on(async { "hello" }) }); } #[test] fn create_rt_in_block_on() { - let mut rt1 = rt(); - let mut rt2 = rt1.block_on(async { rt() }); + let rt1 = rt(); + let rt2 = rt1.block_on(async { rt() }); let out = rt2.block_on(async { "ZOMG" }); assert_eq!(out, "ZOMG"); @@ -360,7 +360,7 @@ rt_test! { #[test] fn complete_block_on_under_load() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -383,7 +383,7 @@ rt_test! { #[test] fn complete_task_under_load() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (tx1, rx1) = oneshot::channel(); @@ -412,7 +412,7 @@ rt_test! { #[test] fn spawn_from_other_thread_idle() { - let mut rt = rt(); + let rt = rt(); let handle = rt.handle().clone(); let (tx, rx) = oneshot::channel(); @@ -432,7 +432,7 @@ rt_test! { #[test] fn spawn_from_other_thread_under_load() { - let mut rt = rt(); + let rt = rt(); let handle = rt.handle().clone(); let (tx, rx) = oneshot::channel(); @@ -457,7 +457,7 @@ rt_test! { #[test] fn delay_at_root() { - let mut rt = rt(); + let rt = rt(); let now = Instant::now(); let dur = Duration::from_millis(50); @@ -471,7 +471,7 @@ rt_test! { #[test] fn delay_in_spawn() { - let mut rt = rt(); + let rt = rt(); let now = Instant::now(); let dur = Duration::from_millis(50); @@ -492,7 +492,7 @@ rt_test! { #[test] fn block_on_socket() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { let (tx, rx) = oneshot::channel(); @@ -512,7 +512,7 @@ rt_test! { #[test] fn spawn_from_blocking() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async move { let inner = assert_ok!(tokio::task::spawn_blocking(|| { @@ -527,7 +527,7 @@ rt_test! { #[test] fn spawn_blocking_from_blocking() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async move { let inner = assert_ok!(tokio::task::spawn_blocking(|| { @@ -542,7 +542,7 @@ rt_test! { #[test] fn delay_from_blocking() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { assert_ok!(tokio::task::spawn_blocking(|| { @@ -562,7 +562,7 @@ rt_test! { #[test] fn socket_from_blocking() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); @@ -615,7 +615,7 @@ rt_test! { // test is disabled. #[cfg(not(windows))] fn io_driver_called_when_under_load() { - let mut rt = rt(); + let rt = rt(); // Create a lot of constant load. The scheduler will always be busy. for _ in 0..100 { @@ -651,7 +651,7 @@ rt_test! { #[test] fn client_server_block_on() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::channel(); rt.block_on(async move { client_server(tx).await }); @@ -662,7 +662,7 @@ rt_test! { #[test] fn panic_in_task() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = oneshot::channel(); struct Boom(Option>); @@ -689,7 +689,7 @@ rt_test! { #[test] #[should_panic] fn panic_in_block_on() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { panic!() }); } @@ -709,7 +709,7 @@ rt_test! { #[test] fn enter_and_spawn() { - let mut rt = rt(); + let rt = rt(); let handle = rt.enter(|| { tokio::spawn(async {}) }); @@ -739,7 +739,7 @@ rt_test! { } } - let mut rt = rt(); + let rt = rt(); let (drop_tx, drop_rx) = mpsc::channel(); let (run_tx, run_rx) = oneshot::channel(); @@ -775,7 +775,7 @@ rt_test! { let (tx2, rx2) = oneshot::channel(); let (tx3, rx3) = oneshot::channel(); - let mut rt = rt(); + let rt = rt(); let h1 = rt.handle().clone(); @@ -823,7 +823,7 @@ rt_test! { use std::net::Ipv6Addr; for _ in 1..10 { - let mut runtime = rt(); + let runtime = rt(); runtime.block_on(async { let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap(); @@ -854,7 +854,7 @@ rt_test! { #[test] fn shutdown_timeout() { let (tx, rx) = oneshot::channel(); - let mut runtime = rt(); + let runtime = rt(); runtime.block_on(async move { task::spawn_blocking(move || { @@ -870,7 +870,7 @@ rt_test! { #[test] fn shutdown_wakeup_time() { - let mut runtime = rt(); + let runtime = rt(); runtime.block_on(async move { tokio::time::delay_for(std::time::Duration::from_millis(100)).await; @@ -927,10 +927,10 @@ rt_test! { #[test] fn local_set_block_on_socket() { - let mut rt = rt(); + let rt = rt(); let local = task::LocalSet::new(); - local.block_on(&mut rt, async move { + local.block_on(&rt, async move { let (tx, rx) = oneshot::channel(); let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -948,12 +948,12 @@ rt_test! { #[test] fn local_set_client_server_block_on() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::channel(); let local = task::LocalSet::new(); - local.block_on(&mut rt, async move { client_server_local(tx).await }); + local.block_on(&rt, async move { client_server_local(tx).await }); assert_ok!(rx.try_recv()); assert_err!(rx.try_recv()); @@ -987,7 +987,7 @@ rt_test! { fn coop() { use std::task::Poll::Ready; - let mut rt = rt(); + let rt = rt(); rt.block_on(async { // Create a bunch of tasks @@ -1019,7 +1019,7 @@ rt_test! { const NUM: usize = 100; - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel(); diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index b5ec96dec35..a67c090ebf4 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -57,22 +57,20 @@ fn many_oneshot_futures() { } #[test] fn many_multishot_futures() { - use tokio::sync::mpsc; - const CHAIN: usize = 200; const CYCLES: usize = 5; const TRACKS: usize = 50; for _ in 0..50 { - let mut rt = rt(); + let rt = rt(); let mut start_txs = Vec::with_capacity(TRACKS); let mut final_rxs = Vec::with_capacity(TRACKS); for _ in 0..TRACKS { - let (start_tx, mut chain_rx) = mpsc::channel(10); + let (start_tx, mut chain_rx) = tokio::sync::mpsc::channel(10); for _ in 0..CHAIN { - let (mut next_tx, next_rx) = mpsc::channel(10); + let (mut next_tx, next_rx) = tokio::sync::mpsc::channel(10); // Forward all the messages rt.spawn(async move { @@ -85,7 +83,7 @@ fn many_multishot_futures() { } // This final task cycles if needed - let (mut final_tx, final_rx) = mpsc::channel(10); + let (mut final_tx, final_rx) = tokio::sync::mpsc::channel(10); let mut cycle_tx = start_tx.clone(); let mut rem = CYCLES; @@ -123,7 +121,7 @@ fn many_multishot_futures() { #[test] fn spawn_shutdown() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::channel(); rt.block_on(async { @@ -230,7 +228,7 @@ fn start_stop_callbacks_called() { let after_inner = after_start.clone(); let before_inner = before_stop.clone(); - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .threaded_scheduler() .enable_all() .on_thread_start(move || { @@ -331,9 +329,7 @@ fn multi_threadpool() { // channel yields occasionally even if there are values ready to receive. #[test] fn coop_and_block_in_place() { - use tokio::sync::mpsc; - - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .threaded_scheduler() // Setting max threads to 1 prevents another thread from claiming the // runtime worker yielded as part of `block_in_place` and guarantees the @@ -344,7 +340,7 @@ fn coop_and_block_in_place() { .unwrap(); rt.block_on(async move { - let (mut tx, mut rx) = mpsc::channel(1024); + let (mut tx, mut rx) = tokio::sync::mpsc::channel(1024); // Fill the channel for _ in 0..1024 { diff --git a/tokio/tests/signal_drop_rt.rs b/tokio/tests/signal_drop_rt.rs index aeedd96e4e6..709e0d41834 100644 --- a/tokio/tests/signal_drop_rt.rs +++ b/tokio/tests/signal_drop_rt.rs @@ -14,11 +14,11 @@ use tokio::signal::unix::{signal, SignalKind}; fn dropping_loops_does_not_cause_starvation() { let kind = SignalKind::user_defined1(); - let mut first_rt = rt(); + let first_rt = rt(); let mut first_signal = first_rt.block_on(async { signal(kind).expect("failed to register first signal") }); - let mut second_rt = rt(); + let second_rt = rt(); let mut second_signal = second_rt.block_on(async { signal(kind).expect("failed to register second signal") }); diff --git a/tokio/tests/signal_multi_rt.rs b/tokio/tests/signal_multi_rt.rs index 9d78469578c..78319a75331 100644 --- a/tokio/tests/signal_multi_rt.rs +++ b/tokio/tests/signal_multi_rt.rs @@ -24,7 +24,7 @@ fn multi_loop() { .map(|_| { let sender = sender.clone(); thread::spawn(move || { - let mut rt = rt(); + let rt = rt(); let _ = rt.block_on(async { let mut signal = signal(SignalKind::hangup()).unwrap(); sender.send(()).unwrap(); diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index 4ca1596e052..6cb11584b4a 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -79,7 +79,7 @@ async fn no_block_in_basic_scheduler() { #[test] fn yes_block_in_threaded_block_on() { - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); @@ -91,7 +91,7 @@ fn yes_block_in_threaded_block_on() { #[test] #[should_panic] fn no_block_in_basic_block_on() { - let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + let rt = runtime::Builder::new().basic_scheduler().build().unwrap(); rt.block_on(async { task::block_in_place(|| {}); }); @@ -99,14 +99,14 @@ fn no_block_in_basic_block_on() { #[test] fn can_enter_basic_rt_from_within_block_in_place() { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); outer.block_on(async { tokio::task::block_in_place(|| { - let mut inner = tokio::runtime::Builder::new() + let inner = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); @@ -120,7 +120,7 @@ fn can_enter_basic_rt_from_within_block_in_place() { fn useful_panic_message_when_dropping_rt_in_rt() { use std::panic::{catch_unwind, AssertUnwindSafe}; - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); @@ -147,7 +147,7 @@ fn useful_panic_message_when_dropping_rt_in_rt() { #[test] fn can_shutdown_with_zero_timeout_in_runtime() { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); @@ -163,7 +163,7 @@ fn can_shutdown_with_zero_timeout_in_runtime() { #[test] fn can_shutdown_now_in_runtime() { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); @@ -179,7 +179,7 @@ fn can_shutdown_now_in_runtime() { #[test] fn coop_disabled_in_block_in_place() { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .enable_time() .build() @@ -213,7 +213,7 @@ fn coop_disabled_in_block_in_place_in_block_on() { let (done_tx, done_rx) = std::sync::mpsc::channel(); let done = done_tx.clone(); thread::spawn(move || { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs index bf80b8ee5f5..23e92586b78 100644 --- a/tokio/tests/task_local_set.rs +++ b/tokio/tests/task_local_set.rs @@ -133,12 +133,12 @@ fn local_threadpool_blocking_in_place() { ON_RT_THREAD.with(|cell| cell.set(true)); - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .threaded_scheduler() .enable_all() .build() .unwrap(); - LocalSet::new().block_on(&mut rt, async { + LocalSet::new().block_on(&rt, async { assert!(ON_RT_THREAD.with(|cell| cell.get())); let join = task::spawn_local(async move { assert!(ON_RT_THREAD.with(|cell| cell.get())); @@ -246,12 +246,12 @@ fn join_local_future_elsewhere() { ON_RT_THREAD.with(|cell| cell.set(true)); - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); let local = LocalSet::new(); - local.block_on(&mut rt, async move { + local.block_on(&rt, async move { let (tx, rx) = oneshot::channel(); let join = task::spawn_local(async move { println!("hello world running..."); @@ -286,7 +286,7 @@ fn drop_cancels_tasks() { use std::rc::Rc; // This test reproduces issue #1842 - let mut rt = rt(); + let rt = rt(); let rc1 = Rc::new(()); let rc2 = rc1.clone(); @@ -303,7 +303,7 @@ fn drop_cancels_tasks() { } }); - local.block_on(&mut rt, async { + local.block_on(&rt, async { started_rx.await.unwrap(); }); drop(local); @@ -362,11 +362,11 @@ fn drop_cancels_remote_tasks() { with_timeout(Duration::from_secs(60), || { let (tx, mut rx) = mpsc::channel::<()>(1024); - let mut rt = rt(); + let rt = rt(); let local = LocalSet::new(); local.spawn_local(async move { while rx.recv().await.is_some() {} }); - local.block_on(&mut rt, async { + local.block_on(&rt, async { time::delay_for(Duration::from_millis(1)).await; }); @@ -385,7 +385,7 @@ fn local_tasks_wake_join_all() { use futures::future::join_all; use tokio::task::LocalSet; - let mut rt = rt(); + let rt = rt(); let set = LocalSet::new(); let mut handles = Vec::new(); diff --git a/tokio/tests/time_rt.rs b/tokio/tests/time_rt.rs index b739f1b2f68..19bcd27d9b5 100644 --- a/tokio/tests/time_rt.rs +++ b/tokio/tests/time_rt.rs @@ -28,7 +28,7 @@ fn timer_with_threaded_runtime() { fn timer_with_basic_scheduler() { use tokio::runtime::Builder; - let mut rt = Builder::new() + let rt = Builder::new() .basic_scheduler() .enable_all() .build()