diff --git a/benches/mpsc.rs b/benches/mpsc.rs index 49bd3cc0434..ec07ad8f8bf 100644 --- a/benches/mpsc.rs +++ b/benches/mpsc.rs @@ -43,7 +43,7 @@ fn send_large(b: &mut Bencher) { } fn contention_bounded(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -70,7 +70,7 @@ fn contention_bounded(b: &mut Bencher) { } fn contention_bounded_full(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -97,7 +97,7 @@ fn contention_bounded_full(b: &mut Bencher) { } fn contention_unbounded(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -124,7 +124,7 @@ fn contention_unbounded(b: &mut Bencher) { } fn uncontented_bounded(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -146,7 +146,7 @@ fn uncontented_bounded(b: &mut Bencher) { } fn uncontented_unbounded(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() diff --git a/benches/scheduler.rs b/benches/scheduler.rs index 0562a1201a7..801de72a553 100644 --- a/benches/scheduler.rs +++ b/benches/scheduler.rs @@ -13,7 +13,7 @@ use std::sync::{mpsc, Arc}; fn spawn_many(b: &mut Bencher) { const NUM_SPAWN: usize = 10_000; - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::sync_channel(1000); let rem = Arc::new(AtomicUsize::new(0)); @@ -68,7 +68,7 @@ fn yield_many(b: &mut Bencher) { fn ping_pong(b: &mut Bencher) { const NUM_PINGS: usize = 1_000; - let mut rt = rt(); + let rt = rt(); let (done_tx, done_rx) = mpsc::sync_channel(1000); let rem = Arc::new(AtomicUsize::new(0)); @@ -111,7 +111,7 @@ fn ping_pong(b: &mut Bencher) { fn chained_spawn(b: &mut Bencher) { const ITER: usize = 1_000; - let mut rt = rt(); + let rt = rt(); fn iter(done_tx: mpsc::SyncSender<()>, n: usize) { if n == 0 { diff --git a/benches/spawn.rs b/benches/spawn.rs index 9122c7b1534..f76daf3fbaa 100644 --- a/benches/spawn.rs +++ b/benches/spawn.rs @@ -10,7 +10,7 @@ async fn work() -> usize { } fn basic_scheduler_local_spawn(bench: &mut Bencher) { - let mut runtime = tokio::runtime::Builder::new() + let runtime = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); @@ -23,7 +23,7 @@ fn basic_scheduler_local_spawn(bench: &mut Bencher) { } fn threaded_scheduler_local_spawn(bench: &mut Bencher) { - let mut runtime = tokio::runtime::Builder::new() + let runtime = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); @@ -40,9 +40,9 @@ fn basic_scheduler_remote_spawn(bench: &mut Bencher) { .basic_scheduler() .build() .unwrap(); - let handle = runtime.handle(); + bench.iter(|| { - let h = handle.spawn(work()); + let h = runtime.spawn(work()); black_box(h); }); } @@ -52,9 +52,9 @@ fn threaded_scheduler_remote_spawn(bench: &mut Bencher) { .threaded_scheduler() .build() .unwrap(); - let handle = runtime.handle(); + bench.iter(|| { - let h = handle.spawn(work()); + let h = runtime.spawn(work()); black_box(h); }); } diff --git a/benches/sync_rwlock.rs b/benches/sync_rwlock.rs index 4eca9807b2e..30c66e49394 100644 --- a/benches/sync_rwlock.rs +++ b/benches/sync_rwlock.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use tokio::{sync::RwLock, task}; fn read_uncontended(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -22,7 +22,7 @@ fn read_uncontended(b: &mut Bencher) { } fn read_concurrent_uncontended_multi(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -51,7 +51,7 @@ fn read_concurrent_uncontended_multi(b: &mut Bencher) { } fn read_concurrent_uncontended(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); @@ -78,7 +78,7 @@ fn read_concurrent_uncontended(b: &mut Bencher) { } fn read_concurrent_contended_multi(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -108,7 +108,7 @@ fn read_concurrent_contended_multi(b: &mut Bencher) { } fn read_concurrent_contended(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); diff --git a/benches/sync_semaphore.rs b/benches/sync_semaphore.rs index c43311c0d35..32d4aa2b50e 100644 --- a/benches/sync_semaphore.rs +++ b/benches/sync_semaphore.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use tokio::{sync::Semaphore, task}; fn uncontended(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -27,7 +27,7 @@ async fn task(s: Arc) { } fn uncontended_concurrent_multi(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -51,7 +51,7 @@ fn uncontended_concurrent_multi(b: &mut Bencher) { } fn uncontended_concurrent_single(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); @@ -73,7 +73,7 @@ fn uncontended_concurrent_single(b: &mut Bencher) { } fn contended_concurrent_multi(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -97,7 +97,7 @@ fn contended_concurrent_multi(b: &mut Bencher) { } fn contended_concurrent_single(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); diff --git a/tests-integration/tests/rt_shell.rs b/tests-integration/tests/rt_shell.rs index 392c05196d0..012f44a71b3 100644 --- a/tests-integration/tests/rt_shell.rs +++ b/tests-integration/tests/rt_shell.rs @@ -18,7 +18,7 @@ fn basic_shell_rt() { }); for _ in 0..1_000 { - let mut rt = runtime::Builder::new().build().unwrap(); + let rt = runtime::Builder::new().build().unwrap(); let (tx, rx) = oneshot::channel(); diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index d53a42ad456..cfbf80cee60 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -28,7 +28,7 @@ pub mod task; pub fn block_on(future: F) -> F::Output { use tokio::runtime; - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .basic_scheduler() .enable_all() .build() diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index f6289093299..e07538d9917 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -12,21 +12,21 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio::runtime::Handle; +use tokio::runtime::Runtime; pin_project! { /// `TokioContext` allows connecting a custom executor with the tokio runtime. /// /// It contains a `Handle` to the runtime. A handle to the runtime can be /// obtain by calling the `Runtime::handle()` method. - pub struct TokioContext { + pub struct TokioContext<'a, F> { #[pin] inner: F, - handle: Handle, + handle: &'a Runtime, } } -impl Future for TokioContext { +impl Future for TokioContext<'_, F> { type Output = F::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -39,16 +39,16 @@ impl Future for TokioContext { } /// Trait extension that simplifies bundling a `Handle` with a `Future`. -pub trait HandleExt { +pub trait RuntimeExt { /// Convenience method that takes a Future and returns a `TokioContext`. /// /// # Example: calling Tokio Runtime from a custom ThreadPool /// /// ```no_run - /// use tokio_util::context::HandleExt; + /// use tokio_util::context::RuntimeExt; /// use tokio::time::{delay_for, Duration}; /// - /// let mut rt = tokio::runtime::Builder::new() + /// let rt = tokio::runtime::Builder::new() /// .threaded_scheduler() /// .enable_all() /// .build().unwrap(); @@ -61,18 +61,17 @@ pub trait HandleExt { /// /// rt.block_on( /// rt2 - /// .handle() /// .wrap(async { delay_for(Duration::from_millis(2)).await }), /// ); ///``` - fn wrap(&self, fut: F) -> TokioContext; + fn wrap(&self, fut: F) -> TokioContext<'_, F>; } -impl HandleExt for Handle { - fn wrap(&self, fut: F) -> TokioContext { +impl RuntimeExt for Runtime { + fn wrap(&self, fut: F) -> TokioContext<'_, F> { TokioContext { inner: fut, - handle: self.clone(), + handle: self, } } } diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs index 49038ddbdb4..2e39b14432a 100644 --- a/tokio-util/tests/context.rs +++ b/tokio-util/tests/context.rs @@ -2,11 +2,11 @@ use tokio::runtime::Builder; use tokio::time::*; -use tokio_util::context::HandleExt; +use tokio_util::context::RuntimeExt; #[test] fn tokio_context_with_another_runtime() { - let mut rt1 = Builder::new() + let rt1 = Builder::new() .threaded_scheduler() .core_threads(1) // no timer! @@ -21,8 +21,5 @@ fn tokio_context_with_another_runtime() { // Without the `HandleExt.wrap()` there would be a panic because there is // no timer running, since it would be referencing runtime r1. - let _ = rt1.block_on( - rt2.handle() - .wrap(async move { delay_for(Duration::from_millis(2)).await }), - ); + let _ = rt1.block_on(rt2.wrap(async move { delay_for(Duration::from_millis(2)).await })); } diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 785968f43f8..9054c3b8733 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -173,7 +173,7 @@ where /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn new(io: E) -> io::Result { PollEvented::new_with_ready(io, mio::Ready::all()) } @@ -201,7 +201,7 @@ where /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result { let registration = Registration::new_with_ready(&io, ready)?; Ok(Self { diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 63aaff56d86..8206507280d 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -67,7 +67,7 @@ impl Registration { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn new(io: &T) -> io::Result where T: Evented, @@ -104,7 +104,7 @@ impl Registration { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn new_with_ready(io: &T, ready: mio::Ready) -> io::Result where T: Evented, diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index fd79b259b92..44945e387b4 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -262,7 +262,7 @@ impl TcpListener { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(listener: net::TcpListener) -> io::Result { let io = mio::net::TcpListener::from_std(listener)?; let io = PollEvented::new(io)?; diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index e624fb9d954..e0348724cff 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -187,7 +187,7 @@ impl TcpStream { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(stream: net::TcpStream) -> io::Result { let io = mio::net::TcpStream::from_stream(stream)?; let io = PollEvented::new(io)?; diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index 16e537739d4..f9d88372035 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -64,7 +64,7 @@ impl UdpSocket { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(socket: net::UdpSocket) -> io::Result { let io = mio::net::UdpSocket::from_socket(socket)?; let io = PollEvented::new(io)?; diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index 2282f96a3f6..ba3a10c4882 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -164,7 +164,7 @@ impl UnixDatagram { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a Tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. /// # Examples /// ``` /// # use std::error::Error; diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 9b76cb01fd7..119dc6fb41c 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -60,7 +60,7 @@ impl UnixListener { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn bind

(path: P) -> io::Result where P: AsRef, @@ -82,7 +82,7 @@ impl UnixListener { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(listener: net::UnixListener) -> io::Result { let listener = mio_uds::UnixListener::from_listener(listener)?; let io = PollEvented::new(listener)?; diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index 559fe02a625..6f49849aef3 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -54,7 +54,7 @@ impl UnixStream { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(stream: net::UnixStream) -> io::Result { let stream = mio_uds::UnixStream::from_stream(stream)?; let io = PollEvented::new(stream)?; diff --git a/tokio/src/park/mod.rs b/tokio/src/park/mod.rs index 2cfef8c2dd8..4085a99a975 100644 --- a/tokio/src/park/mod.rs +++ b/tokio/src/park/mod.rs @@ -42,9 +42,7 @@ cfg_resource_drivers! { mod thread; pub(crate) use self::thread::ParkThread; -cfg_block_on! { - pub(crate) use self::thread::{CachedParkThread, ParkError}; -} +pub(crate) use self::thread::{CachedParkThread, ParkError}; use std::sync::Arc; use std::time::Duration; diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs index 44174d3519f..9ed41310fa5 100644 --- a/tokio/src/park/thread.rs +++ b/tokio/src/park/thread.rs @@ -212,118 +212,114 @@ impl Unpark for UnparkThread { } } -cfg_block_on! { - use std::marker::PhantomData; - use std::rc::Rc; +use std::marker::PhantomData; +use std::rc::Rc; - use std::mem; - use std::task::{RawWaker, RawWakerVTable, Waker}; +use std::mem; +use std::task::{RawWaker, RawWakerVTable, Waker}; - /// Blocks the current thread using a condition variable. - #[derive(Debug)] - pub(crate) struct CachedParkThread { - _anchor: PhantomData>, - } - - impl CachedParkThread { - /// Create a new `ParkThread` handle for the current thread. - /// - /// This type cannot be moved to other threads, so it should be created on - /// the thread that the caller intends to park. - pub(crate) fn new() -> CachedParkThread { - CachedParkThread { - _anchor: PhantomData, - } - } - - pub(crate) fn get_unpark(&self) -> Result { - self.with_current(|park_thread| park_thread.unpark()) - } +/// Blocks the current thread using a condition variable. +#[derive(Debug)] +pub(crate) struct CachedParkThread { + _anchor: PhantomData>, +} - /// Get a reference to the `ParkThread` handle for this thread. - fn with_current(&self, f: F) -> Result - where - F: FnOnce(&ParkThread) -> R, - { - CURRENT_PARKER.try_with(|inner| f(inner)) - .map_err(|_| ()) +impl CachedParkThread { + /// Create a new `ParkThread` handle for the current thread. + /// + /// This type cannot be moved to other threads, so it should be created on + /// the thread that the caller intends to park. + pub(crate) fn new() -> CachedParkThread { + CachedParkThread { + _anchor: PhantomData, } } - impl Park for CachedParkThread { - type Unpark = UnparkThread; - type Error = ParkError; + pub(crate) fn get_unpark(&self) -> Result { + self.with_current(|park_thread| park_thread.unpark()) + } - fn unpark(&self) -> Self::Unpark { - self.get_unpark().unwrap() - } + /// Get a reference to the `ParkThread` handle for this thread. + fn with_current(&self, f: F) -> Result + where + F: FnOnce(&ParkThread) -> R, + { + CURRENT_PARKER.try_with(|inner| f(inner)).map_err(|_| ()) + } +} - fn park(&mut self) -> Result<(), Self::Error> { - self.with_current(|park_thread| park_thread.inner.park())?; - Ok(()) - } +impl Park for CachedParkThread { + type Unpark = UnparkThread; + type Error = ParkError; - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?; - Ok(()) - } + fn unpark(&self) -> Self::Unpark { + self.get_unpark().unwrap() + } - fn shutdown(&mut self) { - let _ = self.with_current(|park_thread| park_thread.inner.shutdown()); - } + fn park(&mut self) -> Result<(), Self::Error> { + self.with_current(|park_thread| park_thread.inner.park())?; + Ok(()) } + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?; + Ok(()) + } - impl UnparkThread { - pub(crate) fn into_waker(self) -> Waker { - unsafe { - let raw = unparker_to_raw_waker(self.inner); - Waker::from_raw(raw) - } - } + fn shutdown(&mut self) { + let _ = self.with_current(|park_thread| park_thread.inner.shutdown()); } +} - impl Inner { - #[allow(clippy::wrong_self_convention)] - fn into_raw(this: Arc) -> *const () { - Arc::into_raw(this) as *const () +impl UnparkThread { + pub(crate) fn into_waker(self) -> Waker { + unsafe { + let raw = unparker_to_raw_waker(self.inner); + Waker::from_raw(raw) } + } +} - unsafe fn from_raw(ptr: *const ()) -> Arc { - Arc::from_raw(ptr as *const Inner) - } +impl Inner { + #[allow(clippy::wrong_self_convention)] + fn into_raw(this: Arc) -> *const () { + Arc::into_raw(this) as *const () } - unsafe fn unparker_to_raw_waker(unparker: Arc) -> RawWaker { - RawWaker::new( - Inner::into_raw(unparker), - &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker), - ) + unsafe fn from_raw(ptr: *const ()) -> Arc { + Arc::from_raw(ptr as *const Inner) } +} - unsafe fn clone(raw: *const ()) -> RawWaker { - let unparker = Inner::from_raw(raw); +unsafe fn unparker_to_raw_waker(unparker: Arc) -> RawWaker { + RawWaker::new( + Inner::into_raw(unparker), + &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker), + ) +} - // Increment the ref count - mem::forget(unparker.clone()); +unsafe fn clone(raw: *const ()) -> RawWaker { + let unparker = Inner::from_raw(raw); - unparker_to_raw_waker(unparker) - } + // Increment the ref count + mem::forget(unparker.clone()); - unsafe fn drop_waker(raw: *const ()) { - let _ = Inner::from_raw(raw); - } + unparker_to_raw_waker(unparker) +} - unsafe fn wake(raw: *const ()) { - let unparker = Inner::from_raw(raw); - unparker.unpark(); - } +unsafe fn drop_waker(raw: *const ()) { + let _ = Inner::from_raw(raw); +} - unsafe fn wake_by_ref(raw: *const ()) { - let unparker = Inner::from_raw(raw); - unparker.unpark(); +unsafe fn wake(raw: *const ()) { + let unparker = Inner::from_raw(raw); + unparker.unpark(); +} - // We don't actually own a reference to the unparker - mem::forget(unparker); - } +unsafe fn wake_by_ref(raw: *const ()) { + let unparker = Inner::from_raw(raw); + unparker.unpark(); + + // We don't actually own a reference to the unparker + mem::forget(unparker); } diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 7e1c257cc86..48cff709d46 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -108,6 +108,7 @@ where } /// Spawns a future onto the thread pool + #[allow(dead_code)] pub(crate) fn spawn(&self, future: F) -> JoinHandle where F: Future + Send + 'static, diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 0b36a75f655..a819e9e9461 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -15,7 +15,6 @@ cfg_blocking_impl! { pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { BlockingPool::new(builder, thread_cap) - } } diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 5ad5f5f8421..47895fcf477 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -6,6 +6,7 @@ use crate::runtime::blocking::schedule::NoopSchedule; use crate::runtime::blocking::shutdown; use crate::runtime::blocking::task::BlockingTask; use crate::runtime::builder::ThreadNameFn; +use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; @@ -67,7 +68,7 @@ pub(crate) fn spawn_blocking(func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, { - let rt = Handle::current(); + let rt = context::current().expect("not currently running on the Tokio runtime."); let (task, handle) = task::joinable(BlockingTask::new(func)); let _ = rt.blocking_spawner.spawn(task, &rt); @@ -79,7 +80,7 @@ pub(crate) fn try_spawn_blocking(func: F) -> Result<(), ()> where F: FnOnce() -> R + Send + 'static, { - let rt = Handle::current(); + let rt = context::current().expect("not currently running on the Tokio runtime."); let (task, _handle) = task::joinable(BlockingTask::new(func)); rt.blocking_spawner.spawn(task, &rt) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index d149875003f..db01cf5871e 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,9 +1,9 @@ +use crate::loom::sync::{Arc, Mutex}; use crate::runtime::handle::Handle; use crate::runtime::shell::Shell; use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner}; use std::fmt; -use std::sync::Arc; /// Builds Tokio Runtime with custom configuration values. /// @@ -67,7 +67,7 @@ pub struct Builder { pub(super) before_stop: Option, } -pub(crate) type ThreadNameFn = Arc String + Send + Sync + 'static>; +pub(crate) type ThreadNameFn = std::sync::Arc String + Send + Sync + 'static>; #[derive(Debug, Clone, Copy)] enum Kind { @@ -100,7 +100,7 @@ impl Builder { max_threads: 512, // Default thread name - thread_name: Arc::new(|| "tokio-runtime-worker".into()), + thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()), // Do not set a stack size by default thread_stack_size: None, @@ -212,7 +212,7 @@ impl Builder { /// ``` pub fn thread_name(&mut self, val: impl Into) -> &mut Self { let val = val.into(); - self.thread_name = Arc::new(move || val.clone()); + self.thread_name = std::sync::Arc::new(move || val.clone()); self } @@ -240,7 +240,7 @@ impl Builder { where F: Fn() -> String + Send + Sync + 'static, { - self.thread_name = Arc::new(f); + self.thread_name = std::sync::Arc::new(f); self } @@ -293,7 +293,7 @@ impl Builder { where F: Fn() + Send + Sync + 'static, { - self.after_start = Some(Arc::new(f)); + self.after_start = Some(std::sync::Arc::new(f)); self } @@ -333,7 +333,7 @@ impl Builder { /// ``` /// use tokio::runtime::Builder; /// - /// let mut rt = Builder::new().build().unwrap(); + /// let rt = Builder::new().build().unwrap(); /// /// rt.block_on(async { /// println!("Hello from the Tokio runtime"); @@ -364,7 +364,7 @@ impl Builder { let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { - kind: Kind::Shell(Shell::new(driver)), + kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))), handle: Handle { spawner, io_handle, @@ -463,7 +463,7 @@ cfg_rt_core! { let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { - kind: Kind::Basic(scheduler), + kind: Kind::Basic(Mutex::new(Some(scheduler))), handle: Handle { spawner, io_handle, diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 1b267f481e2..c42b3432dea 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -7,8 +7,10 @@ thread_local! { static CONTEXT: RefCell> = RefCell::new(None) } -pub(crate) fn current() -> Option { - CONTEXT.with(|ctx| ctx.borrow().clone()) +cfg_blocking_impl! { + pub(crate) fn current() -> Option { + CONTEXT.with(|ctx| ctx.borrow().clone()) + } } cfg_io_driver! { diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs index 56a7c57b6c6..bb6e6be012b 100644 --- a/tokio/src/runtime/enter.rs +++ b/tokio/src/runtime/enter.rs @@ -138,31 +138,29 @@ cfg_rt_threaded! { } } -cfg_block_on! { - impl Enter { - /// Blocks the thread on the specified future, returning the value with - /// which that future completes. - pub(crate) fn block_on(&mut self, f: F) -> Result - where - F: std::future::Future, - { - use crate::park::{CachedParkThread, Park}; - use std::task::Context; - use std::task::Poll::Ready; - - let mut park = CachedParkThread::new(); - let waker = park.get_unpark()?.into_waker(); - let mut cx = Context::from_waker(&waker); - - pin!(f); - - loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { - return Ok(v); - } - - park.park()?; +impl Enter { + /// Blocks the thread on the specified future, returning the value with + /// which that future completes. + pub(crate) fn block_on(&mut self, f: F) -> Result + where + F: std::future::Future, + { + use crate::park::{CachedParkThread, Park}; + use std::task::Context; + use std::task::Poll::Ready; + + let mut park = CachedParkThread::new(); + let waker = park.get_unpark()?.into_waker(); + let mut cx = Context::from_waker(&waker); + + pin!(f); + + loop { + if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { + return Ok(v); } + + park.park()?; } } } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 0716a7fadca..516ad4b3aad 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,16 +1,4 @@ use crate::runtime::{blocking, context, io, time, Spawner}; -use std::{error, fmt}; - -cfg_blocking! { - use crate::runtime::task; - use crate::runtime::blocking::task::BlockingTask; -} - -cfg_rt_core! { - use crate::task::JoinHandle; - - use std::future::Future; -} /// Handle to the runtime. /// @@ -19,7 +7,7 @@ cfg_rt_core! { /// /// [`Runtime::handle`]: crate::runtime::Runtime::handle() #[derive(Debug, Clone)] -pub struct Handle { +pub(crate) struct Handle { pub(super) spawner: Spawner, /// Handles to the I/O drivers @@ -39,333 +27,10 @@ impl Handle { /// Enter the runtime context. This allows you to construct types that must /// have an executor available on creation such as [`Delay`] or [`TcpStream`]. /// It will also allow you to call methods such as [`tokio::spawn`]. - /// - /// This function is also available as [`Runtime::enter`]. - /// - /// [`Delay`]: struct@crate::time::Delay - /// [`TcpStream`]: struct@crate::net::TcpStream - /// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter - /// [`tokio::spawn`]: fn@crate::spawn - /// - /// # Example - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// fn function_that_spawns(msg: String) { - /// // Had we not used `handle.enter` below, this would panic. - /// tokio::spawn(async move { - /// println!("{}", msg); - /// }); - /// } - /// - /// fn main() { - /// let rt = Runtime::new().unwrap(); - /// let handle = rt.handle().clone(); - /// - /// let s = "Hello World!".to_string(); - /// - /// // By entering the context, we tie `tokio::spawn` to this executor. - /// handle.enter(|| function_that_spawns(s)); - /// } - /// ``` - pub fn enter(&self, f: F) -> R + pub(crate) fn enter(&self, f: F) -> R where F: FnOnce() -> R, { context::enter(self.clone(), f) } - - /// Returns a `Handle` view over the currently running `Runtime` - /// - /// # Panic - /// - /// This will panic if called outside the context of a Tokio runtime. That means that you must - /// call this on one of the threads **being run by the runtime**. Calling this from within a - /// thread created by `std::thread::spawn` (for example) will cause a panic. - /// - /// # Examples - /// - /// This can be used to obtain the handle of the surrounding runtime from an async - /// block or function running on that runtime. - /// - /// ``` - /// # use std::thread; - /// # use tokio::runtime::Runtime; - /// # fn dox() { - /// # let rt = Runtime::new().unwrap(); - /// # rt.spawn(async { - /// use tokio::runtime::Handle; - /// - /// // Inside an async block or function. - /// let handle = Handle::current(); - /// handle.spawn(async { - /// println!("now running in the existing Runtime"); - /// }); - /// - /// # let handle = - /// thread::spawn(move || { - /// // Notice that the handle is created outside of this thread and then moved in - /// handle.block_on(async { /* ... */ }) - /// // This next line would cause a panic - /// // let handle2 = Handle::current(); - /// }); - /// # handle.join().unwrap(); - /// # }); - /// # } - /// ``` - pub fn current() -> Self { - context::current().expect("not currently running on the Tokio runtime.") - } - - /// Returns a Handle view over the currently running Runtime - /// - /// Returns an error if no Runtime has been started - /// - /// Contrary to `current`, this never panics - pub fn try_current() -> Result { - context::current().ok_or(TryCurrentError(())) - } -} - -cfg_rt_core! { - impl Handle { - /// Spawns a future onto the Tokio runtime. - /// - /// This spawns the given future onto the runtime's executor, usually a - /// thread pool. The thread pool is then responsible for polling the future - /// until it completes. - /// - /// See [module level][mod] documentation for more details. - /// - /// [mod]: index.html - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// # fn dox() { - /// // Create the runtime - /// let rt = Runtime::new().unwrap(); - /// let handle = rt.handle(); - /// - /// // Spawn a future onto the runtime - /// handle.spawn(async { - /// println!("now running on a worker thread"); - /// }); - /// # } - /// ``` - /// - /// # Panics - /// - /// This function will not panic unless task execution is disabled on the - /// executor. This can only happen if the runtime was built using - /// [`Builder`] without picking either [`basic_scheduler`] or - /// [`threaded_scheduler`]. - /// - /// [`Builder`]: struct@crate::runtime::Builder - /// [`threaded_scheduler`]: fn@crate::runtime::Builder::threaded_scheduler - /// [`basic_scheduler`]: fn@crate::runtime::Builder::basic_scheduler - pub fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - self.spawner.spawn(future) - } - - /// Run a future to completion on the Tokio runtime from a synchronous - /// context. - /// - /// This runs the given future on the runtime, blocking until it is - /// complete, and yielding its resolved result. Any tasks or timers which - /// the future spawns internally will be executed on the runtime. - /// - /// If the provided executor currently has no active core thread, this - /// function might hang until a core thread is added. This is not a - /// concern when using the [threaded scheduler], as it always has active - /// core threads, but if you use the [basic scheduler], some other - /// thread must currently be inside a call to [`Runtime::block_on`]. - /// See also [the module level documentation][1], which has a section on - /// scheduler types. - /// - /// This method may not be called from an asynchronous context. - /// - /// [threaded scheduler]: fn@crate::runtime::Builder::threaded_scheduler - /// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler - /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on - /// [1]: index.html#runtime-configurations - /// - /// # Panics - /// - /// This function panics if the provided future panics, or if called - /// within an asynchronous execution context. - /// - /// # Examples - /// - /// Using `block_on` with the [threaded scheduler]. - /// - /// ``` - /// use tokio::runtime::Runtime; - /// use std::thread; - /// - /// // Create the runtime. - /// // - /// // If the rt-threaded feature is enabled, this creates a threaded - /// // scheduler by default. - /// let rt = Runtime::new().unwrap(); - /// let handle = rt.handle().clone(); - /// - /// // Use the runtime from another thread. - /// let th = thread::spawn(move || { - /// // Execute the future, blocking the current thread until completion. - /// // - /// // This example uses the threaded scheduler, so no concurrent call to - /// // `rt.block_on` is required. - /// handle.block_on(async { - /// println!("hello"); - /// }); - /// }); - /// - /// th.join().unwrap(); - /// ``` - /// - /// Using the [basic scheduler] requires a concurrent call to - /// [`Runtime::block_on`]: - /// - /// [threaded scheduler]: fn@crate::runtime::Builder::threaded_scheduler - /// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler - /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on - /// - /// ``` - /// use tokio::runtime::Builder; - /// use tokio::sync::oneshot; - /// use std::thread; - /// - /// // Create the runtime. - /// let mut rt = Builder::new() - /// .enable_all() - /// .basic_scheduler() - /// .build() - /// .unwrap(); - /// - /// let handle = rt.handle().clone(); - /// - /// // Signal main thread when task has finished. - /// let (send, recv) = oneshot::channel(); - /// - /// // Use the runtime from another thread. - /// let th = thread::spawn(move || { - /// // Execute the future, blocking the current thread until completion. - /// handle.block_on(async { - /// send.send("done").unwrap(); - /// }); - /// }); - /// - /// // The basic scheduler is used, so the thread above might hang if we - /// // didn't call block_on on the rt too. - /// rt.block_on(async { - /// assert_eq!(recv.await.unwrap(), "done"); - /// }); - /// # th.join().unwrap(); - /// ``` - /// - pub fn block_on(&self, future: F) -> F::Output { - self.enter(|| { - let mut enter = crate::runtime::enter(true); - enter.block_on(future).expect("failed to park thread") - }) - } - } -} - -cfg_blocking! { - impl Handle { - /// Runs the provided closure on a thread where blocking is acceptable. - /// - /// In general, issuing a blocking call or performing a lot of compute in a - /// future without yielding is not okay, as it may prevent the executor from - /// driving other futures forward. This function runs the provided closure - /// on a thread dedicated to blocking operations. See the [CPU-bound tasks - /// and blocking code][blocking] section for more information. - /// - /// Tokio will spawn more blocking threads when they are requested through - /// this function until the upper limit configured on the [`Builder`] is - /// reached. This limit is very large by default, because `spawn_blocking` is - /// often used for various kinds of IO operations that cannot be performed - /// asynchronously. When you run CPU-bound code using `spawn_blocking`, you - /// should keep this large upper limit in mind; to run your CPU-bound - /// computations on only a few threads, you should use a separate thread - /// pool such as [rayon] rather than configuring the number of blocking - /// threads. - /// - /// This function is intended for non-async operations that eventually - /// finish on their own. If you want to spawn an ordinary thread, you should - /// use [`thread::spawn`] instead. - /// - /// Closures spawned using `spawn_blocking` cannot be cancelled. When you - /// shut down the executor, it will wait indefinitely for all blocking - /// operations to finish. You can use [`shutdown_timeout`] to stop waiting - /// for them after a certain timeout. Be aware that this will still not - /// cancel the tasks — they are simply allowed to keep running after the - /// method returns. - /// - /// Note that if you are using the [basic scheduler], this function will - /// still spawn additional threads for blocking operations. The basic - /// scheduler's single thread is only used for asynchronous code. - /// - /// [`Builder`]: struct@crate::runtime::Builder - /// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code - /// [rayon]: https://docs.rs/rayon - /// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler - /// [`thread::spawn`]: fn@std::thread::spawn - /// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// # async fn docs() -> Result<(), Box>{ - /// // Create the runtime - /// let rt = Runtime::new().unwrap(); - /// let handle = rt.handle(); - /// - /// let res = handle.spawn_blocking(move || { - /// // do some compute-heavy work or call synchronous code - /// "done computing" - /// }).await?; - /// - /// assert_eq!(res, "done computing"); - /// # Ok(()) - /// # } - /// ``` - pub fn spawn_blocking(&self, f: F) -> JoinHandle - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - let (task, handle) = task::joinable(BlockingTask::new(f)); - let _ = self.blocking_spawner.spawn(task, self); - handle - } - } -} - -/// Error returned by `try_current` when no Runtime has been started -pub struct TryCurrentError(()); - -impl fmt::Debug for TryCurrentError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TryCurrentError").finish() - } -} - -impl fmt::Display for TryCurrentError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("no tokio Runtime has been initialized") - } } - -impl error::Error for TryCurrentError {} diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 637f38cabb5..9d26446bf7e 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -69,7 +69,7 @@ //! //! fn main() -> Result<(), Box> { //! // Create the runtime -//! let mut rt = Runtime::new()?; +//! let rt = Runtime::new()?; //! //! // Spawn the root task //! rt.block_on(async { @@ -212,7 +212,7 @@ pub(crate) mod enter; use self::enter::enter; mod handle; -pub use self::handle::{Handle, TryCurrentError}; +use handle::Handle; mod io; @@ -240,6 +240,7 @@ cfg_rt_core! { use crate::task::JoinHandle; } +use crate::loom::sync::Mutex; use std::future::Future; use std::time::Duration; @@ -288,11 +289,11 @@ pub struct Runtime { enum Kind { /// Not able to execute concurrent tasks. This variant is mostly used to get /// access to the driver handles. - Shell(Shell), + Shell(Mutex>), /// Execute all tasks on the current-thread. #[cfg(feature = "rt-core")] - Basic(BasicScheduler), + Basic(Mutex>>), /// Execute tasks across multiple threads. #[cfg(feature = "rt-threaded")] @@ -397,7 +398,7 @@ impl Runtime { Kind::Shell(_) => panic!("task execution disabled"), #[cfg(feature = "rt-threaded")] Kind::ThreadPool(exec) => exec.spawn(future), - Kind::Basic(exec) => exec.spawn(future), + Kind::Basic(_exec) => self.handle.spawner.spawn(future), } } @@ -408,10 +409,11 @@ impl Runtime { /// complete, and yielding its resolved result. Any tasks or timers which /// the future spawns internally will be executed on the runtime. /// - /// `&mut` is required as calling `block_on` **may** result in advancing the - /// state of the runtime. The details depend on how the runtime is - /// configured. [`runtime::Handle::block_on`][handle] provides a version - /// that takes `&self`. + /// When this runtime is configured with `core_threads = 0`, only the first call + /// to `block_on` will run the IO and timer drivers. Calls to other methods _before_ the first + /// `block_on` completes will just hook into the driver running on the thread + /// that first called `block_on`. This means that the driver may be passed + /// from thread to thread by the user between calls to `block_on`. /// /// This method may not be called from an asynchronous context. /// @@ -426,7 +428,7 @@ impl Runtime { /// use tokio::runtime::Runtime; /// /// // Create the runtime - /// let mut rt = Runtime::new().unwrap(); + /// let rt = Runtime::new().unwrap(); /// /// // Execute the future, blocking the current thread until completion /// rt.block_on(async { @@ -435,13 +437,45 @@ impl Runtime { /// ``` /// /// [handle]: fn@Handle::block_on - pub fn block_on(&mut self, future: F) -> F::Output { - let kind = &mut self.kind; - - self.handle.enter(|| match kind { - Kind::Shell(exec) => exec.block_on(future), + pub fn block_on(&self, future: F) -> F::Output { + self.handle.enter(|| match &self.kind { + Kind::Shell(exec) => { + // TODO(lucio): clean this up and move this impl into + // `shell.rs`, this is hacky and bad but will work for + // now. + let exec_temp = { + let mut lock = exec.lock().unwrap(); + lock.take() + }; + + if let Some(mut exec_temp) = exec_temp { + let res = exec_temp.block_on(future); + exec.lock().unwrap().replace(exec_temp); + res + } else { + let mut enter = crate::runtime::enter(true); + enter.block_on(future).unwrap() + } + } #[cfg(feature = "rt-core")] - Kind::Basic(exec) => exec.block_on(future), + Kind::Basic(exec) => { + // TODO(lucio): clean this up and move this impl into + // `basic_scheduler.rs`, this is hacky and bad but will work for + // now. + let exec_temp = { + let mut lock = exec.lock().unwrap(); + lock.take() + }; + + if let Some(mut exec_temp) = exec_temp { + let res = exec_temp.block_on(future); + exec.lock().unwrap().replace(exec_temp); + res + } else { + let mut enter = crate::runtime::enter(true); + enter.block_on(future).unwrap() + } + } #[cfg(feature = "rt-threaded")] Kind::ThreadPool(exec) => exec.block_on(future), }) @@ -451,11 +485,8 @@ impl Runtime { /// have an executor available on creation such as [`Delay`] or [`TcpStream`]. /// It will also allow you to call methods such as [`tokio::spawn`]. /// - /// This function is also available as [`Handle::enter`]. - /// /// [`Delay`]: struct@crate::time::Delay /// [`TcpStream`]: struct@crate::net::TcpStream - /// [`Handle::enter`]: fn@crate::runtime::Handle::enter /// [`tokio::spawn`]: fn@crate::spawn /// /// # Example @@ -486,27 +517,6 @@ impl Runtime { self.handle.enter(f) } - /// Return a handle to the runtime's spawner. - /// - /// The returned handle can be used to spawn tasks that run on this runtime, and can - /// be cloned to allow moving the `Handle` to other threads. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// let rt = Runtime::new() - /// .unwrap(); - /// - /// let handle = rt.handle(); - /// - /// handle.spawn(async { println!("hello"); }); - /// ``` - pub fn handle(&self) -> &Handle { - &self.handle - } - /// Shutdown the runtime, waiting for at most `duration` for all spawned /// task to shutdown. /// @@ -531,7 +541,7 @@ impl Runtime { /// use std::time::Duration; /// /// fn main() { - /// let mut runtime = Runtime::new().unwrap(); + /// let runtime = Runtime::new().unwrap(); /// /// runtime.block_on(async move { /// task::spawn_blocking(move || { @@ -565,7 +575,7 @@ impl Runtime { /// use tokio::runtime::Runtime; /// /// fn main() { - /// let mut runtime = Runtime::new().unwrap(); + /// let runtime = Runtime::new().unwrap(); /// /// runtime.block_on(async move { /// let inner_runtime = Runtime::new().unwrap(); diff --git a/tokio/src/runtime/tests/loom_pool.rs b/tokio/src/runtime/tests/loom_pool.rs index c08658cde87..ed484846311 100644 --- a/tokio/src/runtime/tests/loom_pool.rs +++ b/tokio/src/runtime/tests/loom_pool.rs @@ -178,7 +178,7 @@ mod group_b { #[test] fn join_output() { loom::model(|| { - let mut rt = mk_pool(1); + let rt = mk_pool(1); rt.block_on(async { let t = crate::spawn(track(async { "hello" })); @@ -192,7 +192,7 @@ mod group_b { #[test] fn poll_drop_handle_then_drop() { loom::model(|| { - let mut rt = mk_pool(1); + let rt = mk_pool(1); rt.block_on(async move { let mut t = crate::spawn(track(async { "hello" })); diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index 50edd2b6c48..9e55a5e60e0 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -185,7 +185,7 @@ mod tests { #[test] fn smoke() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { let registry = Registry::new(vec![ EventInfo::default(), @@ -247,7 +247,7 @@ mod tests { #[test] fn broadcast_cleans_up_disconnected_listeners() { - let mut rt = Runtime::new().unwrap(); + let rt = Runtime::new().unwrap(); rt.block_on(async { let registry = Registry::new(vec![EventInfo::default()]); diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs index f55e504b00f..b3f495b6111 100644 --- a/tokio/src/signal/windows.rs +++ b/tokio/src/signal/windows.rs @@ -272,7 +272,7 @@ mod tests { #[test] fn ctrl_break() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let mut ctrl_break = assert_ok!(super::ctrl_break()); diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 3c409edfb90..a56453df401 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -312,9 +312,9 @@ impl LocalSet { /// use tokio::runtime::Runtime; /// use tokio::task; /// - /// let mut rt = Runtime::new().unwrap(); + /// let rt = Runtime::new().unwrap(); /// let local = task::LocalSet::new(); - /// local.block_on(&mut rt, async { + /// local.block_on(&rt, async { /// let join = task::spawn_local(async { /// let blocking_result = task::block_in_place(|| { /// // ... @@ -329,9 +329,9 @@ impl LocalSet { /// use tokio::runtime::Runtime; /// use tokio::task; /// - /// let mut rt = Runtime::new().unwrap(); + /// let rt = Runtime::new().unwrap(); /// let local = task::LocalSet::new(); - /// local.block_on(&mut rt, async { + /// local.block_on(&rt, async { /// let join = task::spawn_local(async { /// let blocking_result = task::spawn_blocking(|| { /// // ... @@ -346,7 +346,7 @@ impl LocalSet { /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on /// [in-place blocking]: fn@crate::task::block_in_place /// [`spawn_blocking`]: fn@crate::task::spawn_blocking - pub fn block_on(&self, rt: &mut crate::runtime::Runtime, future: F) -> F::Output + pub fn block_on(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output where F: Future, { diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index d6e771184f2..280e90ead04 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -18,7 +18,7 @@ doc_rt_core! { /// /// This function must be called from the context of a Tokio runtime. Tasks running on /// the Tokio runtime are always inside its context, but you can also enter the context - /// using the [`Handle::enter`](crate::runtime::Handle::enter()) method. + /// using the [`Runtime::enter`](crate::runtime::Runtime::enter()) method. /// /// # Examples /// diff --git a/tokio/tests/io_driver.rs b/tokio/tests/io_driver.rs index b85abd8c2ac..d4f4f8d48cf 100644 --- a/tokio/tests/io_driver.rs +++ b/tokio/tests/io_driver.rs @@ -45,7 +45,7 @@ fn test_drop_on_notify() { // shutting down. Then, when the task handle is dropped, the task itself is // dropped. - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .basic_scheduler() .enable_all() .build() diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 0885992d7d2..3813c48006c 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -12,7 +12,7 @@ use std::time::Duration; fn spawned_task_does_not_progress_without_block_on() { let (tx, mut rx) = oneshot::channel(); - let mut rt = rt(); + let rt = rt(); rt.spawn(async move { assert_ok!(tx.send("hello")); @@ -65,7 +65,7 @@ fn no_extra_poll() { }; let npolls = Arc::clone(&rx.npolls); - let mut rt = rt(); + let rt = rt(); rt.spawn(async move { while rx.next().await.is_some() {} }); rt.block_on(async { @@ -100,7 +100,7 @@ fn acquire_mutex_in_drop() { let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); - let mut rt = rt(); + let rt = rt(); rt.spawn(async move { let _ = rx2.await; diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 4211a66705e..35e2ea81a02 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -9,38 +9,41 @@ macro_rules! rt_test { mod basic_scheduler { $($t)* - fn rt() -> Runtime { + fn rt() -> Arc { tokio::runtime::Builder::new() .basic_scheduler() .enable_all() .build() .unwrap() + .into() } } mod threaded_scheduler_4_threads { $($t)* - fn rt() -> Runtime { + fn rt() -> Arc { tokio::runtime::Builder::new() .threaded_scheduler() .core_threads(4) .enable_all() .build() .unwrap() + .into() } } mod threaded_scheduler_1_thread { $($t)* - fn rt() -> Runtime { + fn rt() -> Arc { tokio::runtime::Builder::new() .threaded_scheduler() .core_threads(1) .enable_all() .build() .unwrap() + .into() } } } @@ -72,7 +75,7 @@ rt_test! { #[test] fn block_on_sync() { - let mut rt = rt(); + let rt = rt(); let mut win = false; rt.block_on(async { @@ -82,41 +85,12 @@ rt_test! { assert!(win); } - #[test] - fn block_on_handle_sync() { - let rt = rt(); - - let mut win = false; - rt.handle().block_on(async { - win = true; - }); - - assert!(win); - } #[test] fn block_on_async() { - let mut rt = rt(); - - let out = rt.block_on(async { - let (tx, rx) = oneshot::channel(); - - thread::spawn(move || { - thread::sleep(Duration::from_millis(50)); - tx.send("ZOMG").unwrap(); - }); - - assert_ok!(rx.await) - }); - - assert_eq!(out, "ZOMG"); - } - - #[test] - fn block_on_handle_async() { let rt = rt(); - let out = rt.handle().block_on(async { + let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); thread::spawn(move || { @@ -132,7 +106,7 @@ rt_test! { #[test] fn spawn_one_bg() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -149,7 +123,7 @@ rt_test! { #[test] fn spawn_one_join() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -172,7 +146,7 @@ rt_test! { #[test] fn spawn_two() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx1, rx1) = oneshot::channel(); @@ -199,7 +173,7 @@ rt_test! { const ITER: usize = 200; - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (done_tx, mut done_rx) = mpsc::unbounded_channel(); @@ -249,7 +223,7 @@ rt_test! { const ITER: usize = 500; - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { tokio::spawn(async move { @@ -305,7 +279,7 @@ rt_test! { #[test] fn spawn_await_chain() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { assert_ok!(tokio::spawn(async { @@ -320,7 +294,7 @@ rt_test! { #[test] fn outstanding_tasks_dropped() { - let mut rt = rt(); + let rt = rt(); let cnt = Arc::new(()); @@ -343,16 +317,16 @@ rt_test! { #[test] #[should_panic] fn nested_rt() { - let mut rt1 = rt(); - let mut rt2 = rt(); + let rt1 = rt(); + let rt2 = rt(); rt1.block_on(async { rt2.block_on(async { "hello" }) }); } #[test] fn create_rt_in_block_on() { - let mut rt1 = rt(); - let mut rt2 = rt1.block_on(async { rt() }); + let rt1 = rt(); + let rt2 = rt1.block_on(async { rt() }); let out = rt2.block_on(async { "ZOMG" }); assert_eq!(out, "ZOMG"); @@ -360,7 +334,7 @@ rt_test! { #[test] fn complete_block_on_under_load() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -383,7 +357,7 @@ rt_test! { #[test] fn complete_task_under_load() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (tx1, rx1) = oneshot::channel(); @@ -412,8 +386,8 @@ rt_test! { #[test] fn spawn_from_other_thread_idle() { - let mut rt = rt(); - let handle = rt.handle().clone(); + let rt = rt(); + let handle = rt.clone(); let (tx, rx) = oneshot::channel(); @@ -432,8 +406,8 @@ rt_test! { #[test] fn spawn_from_other_thread_under_load() { - let mut rt = rt(); - let handle = rt.handle().clone(); + let rt = rt(); + let handle = rt.clone(); let (tx, rx) = oneshot::channel(); @@ -457,7 +431,7 @@ rt_test! { #[test] fn delay_at_root() { - let mut rt = rt(); + let rt = rt(); let now = Instant::now(); let dur = Duration::from_millis(50); @@ -471,7 +445,7 @@ rt_test! { #[test] fn delay_in_spawn() { - let mut rt = rt(); + let rt = rt(); let now = Instant::now(); let dur = Duration::from_millis(50); @@ -492,7 +466,7 @@ rt_test! { #[test] fn block_on_socket() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { let (tx, rx) = oneshot::channel(); @@ -512,7 +486,7 @@ rt_test! { #[test] fn spawn_from_blocking() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async move { let inner = assert_ok!(tokio::task::spawn_blocking(|| { @@ -527,7 +501,7 @@ rt_test! { #[test] fn spawn_blocking_from_blocking() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async move { let inner = assert_ok!(tokio::task::spawn_blocking(|| { @@ -542,7 +516,7 @@ rt_test! { #[test] fn delay_from_blocking() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { assert_ok!(tokio::task::spawn_blocking(|| { @@ -562,7 +536,7 @@ rt_test! { #[test] fn socket_from_blocking() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); @@ -586,7 +560,7 @@ rt_test! { #[test] fn spawn_blocking_after_shutdown() { let rt = rt(); - let handle = rt.handle().clone(); + let handle = rt.clone(); // Shutdown drop(rt); @@ -615,7 +589,7 @@ rt_test! { // test is disabled. #[cfg(not(windows))] fn io_driver_called_when_under_load() { - let mut rt = rt(); + let rt = rt(); // Create a lot of constant load. The scheduler will always be busy. for _ in 0..100 { @@ -651,7 +625,7 @@ rt_test! { #[test] fn client_server_block_on() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::channel(); rt.block_on(async move { client_server(tx).await }); @@ -662,7 +636,7 @@ rt_test! { #[test] fn panic_in_task() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = oneshot::channel(); struct Boom(Option>); @@ -689,7 +663,7 @@ rt_test! { #[test] #[should_panic] fn panic_in_block_on() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { panic!() }); } @@ -709,7 +683,7 @@ rt_test! { #[test] fn enter_and_spawn() { - let mut rt = rt(); + let rt = rt(); let handle = rt.enter(|| { tokio::spawn(async {}) }); @@ -739,7 +713,7 @@ rt_test! { } } - let mut rt = rt(); + let rt = rt(); let (drop_tx, drop_rx) = mpsc::channel(); let (run_tx, run_rx) = oneshot::channel(); @@ -775,17 +749,17 @@ rt_test! { let (tx2, rx2) = oneshot::channel(); let (tx3, rx3) = oneshot::channel(); - let mut rt = rt(); + let rt = rt(); - let h1 = rt.handle().clone(); + let h1 = rt.clone(); - rt.handle().spawn(async move { + rt.spawn(async move { // Ensure a waker gets stored in oneshot 1. let _ = rx1.await; tx3.send(()).unwrap(); }); - rt.handle().spawn(async move { + rt.spawn(async move { // When this task is dropped, we'll be "closing remotes". // We spawn a new task that owns the `tx1`, to move its Drop // out of here. @@ -802,7 +776,7 @@ rt_test! { let _ = rx2.await; }); - rt.handle().spawn(async move { + rt.spawn(async move { let _ = rx3.await; // We'll never get here, but once task 3 drops, this will // force task 2 to re-schedule since it's waiting on oneshot 2. @@ -823,7 +797,7 @@ rt_test! { use std::net::Ipv6Addr; for _ in 1..10 { - let mut runtime = rt(); + let runtime = rt(); runtime.block_on(async { let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap(); @@ -854,7 +828,7 @@ rt_test! { #[test] fn shutdown_timeout() { let (tx, rx) = oneshot::channel(); - let mut runtime = rt(); + let runtime = rt(); runtime.block_on(async move { task::spawn_blocking(move || { @@ -865,18 +839,18 @@ rt_test! { rx.await.unwrap(); }); - runtime.shutdown_timeout(Duration::from_millis(100)); + Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100)); } #[test] fn shutdown_wakeup_time() { - let mut runtime = rt(); + let runtime = rt(); runtime.block_on(async move { tokio::time::delay_for(std::time::Duration::from_millis(100)).await; }); - runtime.shutdown_timeout(Duration::from_secs(10_000)); + Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000)); } // This test is currently ignored on Windows because of a @@ -894,7 +868,9 @@ rt_test! { thread::spawn(|| { R.with(|cell| { - *cell.borrow_mut() = Some(rt()); + let rt = rt(); + let rt = Arc::try_unwrap(rt).unwrap(); + *cell.borrow_mut() = Some(rt); }); let _rt = rt(); @@ -927,10 +903,10 @@ rt_test! { #[test] fn local_set_block_on_socket() { - let mut rt = rt(); + let rt = rt(); let local = task::LocalSet::new(); - local.block_on(&mut rt, async move { + local.block_on(&rt, async move { let (tx, rx) = oneshot::channel(); let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -948,12 +924,12 @@ rt_test! { #[test] fn local_set_client_server_block_on() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::channel(); let local = task::LocalSet::new(); - local.block_on(&mut rt, async move { client_server_local(tx).await }); + local.block_on(&rt, async move { client_server_local(tx).await }); assert_ok!(rx.try_recv()); assert_err!(rx.try_recv()); @@ -987,7 +963,7 @@ rt_test! { fn coop() { use std::task::Poll::Ready; - let mut rt = rt(); + let rt = rt(); rt.block_on(async { // Create a bunch of tasks @@ -1019,7 +995,7 @@ rt_test! { const NUM: usize = 100; - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel(); diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index b5ec96dec35..a67c090ebf4 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -57,22 +57,20 @@ fn many_oneshot_futures() { } #[test] fn many_multishot_futures() { - use tokio::sync::mpsc; - const CHAIN: usize = 200; const CYCLES: usize = 5; const TRACKS: usize = 50; for _ in 0..50 { - let mut rt = rt(); + let rt = rt(); let mut start_txs = Vec::with_capacity(TRACKS); let mut final_rxs = Vec::with_capacity(TRACKS); for _ in 0..TRACKS { - let (start_tx, mut chain_rx) = mpsc::channel(10); + let (start_tx, mut chain_rx) = tokio::sync::mpsc::channel(10); for _ in 0..CHAIN { - let (mut next_tx, next_rx) = mpsc::channel(10); + let (mut next_tx, next_rx) = tokio::sync::mpsc::channel(10); // Forward all the messages rt.spawn(async move { @@ -85,7 +83,7 @@ fn many_multishot_futures() { } // This final task cycles if needed - let (mut final_tx, final_rx) = mpsc::channel(10); + let (mut final_tx, final_rx) = tokio::sync::mpsc::channel(10); let mut cycle_tx = start_tx.clone(); let mut rem = CYCLES; @@ -123,7 +121,7 @@ fn many_multishot_futures() { #[test] fn spawn_shutdown() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::channel(); rt.block_on(async { @@ -230,7 +228,7 @@ fn start_stop_callbacks_called() { let after_inner = after_start.clone(); let before_inner = before_stop.clone(); - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .threaded_scheduler() .enable_all() .on_thread_start(move || { @@ -331,9 +329,7 @@ fn multi_threadpool() { // channel yields occasionally even if there are values ready to receive. #[test] fn coop_and_block_in_place() { - use tokio::sync::mpsc; - - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .threaded_scheduler() // Setting max threads to 1 prevents another thread from claiming the // runtime worker yielded as part of `block_in_place` and guarantees the @@ -344,7 +340,7 @@ fn coop_and_block_in_place() { .unwrap(); rt.block_on(async move { - let (mut tx, mut rx) = mpsc::channel(1024); + let (mut tx, mut rx) = tokio::sync::mpsc::channel(1024); // Fill the channel for _ in 0..1024 { diff --git a/tokio/tests/signal_drop_rt.rs b/tokio/tests/signal_drop_rt.rs index aeedd96e4e6..709e0d41834 100644 --- a/tokio/tests/signal_drop_rt.rs +++ b/tokio/tests/signal_drop_rt.rs @@ -14,11 +14,11 @@ use tokio::signal::unix::{signal, SignalKind}; fn dropping_loops_does_not_cause_starvation() { let kind = SignalKind::user_defined1(); - let mut first_rt = rt(); + let first_rt = rt(); let mut first_signal = first_rt.block_on(async { signal(kind).expect("failed to register first signal") }); - let mut second_rt = rt(); + let second_rt = rt(); let mut second_signal = second_rt.block_on(async { signal(kind).expect("failed to register second signal") }); diff --git a/tokio/tests/signal_multi_rt.rs b/tokio/tests/signal_multi_rt.rs index 9d78469578c..78319a75331 100644 --- a/tokio/tests/signal_multi_rt.rs +++ b/tokio/tests/signal_multi_rt.rs @@ -24,7 +24,7 @@ fn multi_loop() { .map(|_| { let sender = sender.clone(); thread::spawn(move || { - let mut rt = rt(); + let rt = rt(); let _ = rt.block_on(async { let mut signal = signal(SignalKind::hangup()).unwrap(); sender.send(()).unwrap(); diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index 4ca1596e052..6cb11584b4a 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -79,7 +79,7 @@ async fn no_block_in_basic_scheduler() { #[test] fn yes_block_in_threaded_block_on() { - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); @@ -91,7 +91,7 @@ fn yes_block_in_threaded_block_on() { #[test] #[should_panic] fn no_block_in_basic_block_on() { - let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + let rt = runtime::Builder::new().basic_scheduler().build().unwrap(); rt.block_on(async { task::block_in_place(|| {}); }); @@ -99,14 +99,14 @@ fn no_block_in_basic_block_on() { #[test] fn can_enter_basic_rt_from_within_block_in_place() { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); outer.block_on(async { tokio::task::block_in_place(|| { - let mut inner = tokio::runtime::Builder::new() + let inner = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); @@ -120,7 +120,7 @@ fn can_enter_basic_rt_from_within_block_in_place() { fn useful_panic_message_when_dropping_rt_in_rt() { use std::panic::{catch_unwind, AssertUnwindSafe}; - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); @@ -147,7 +147,7 @@ fn useful_panic_message_when_dropping_rt_in_rt() { #[test] fn can_shutdown_with_zero_timeout_in_runtime() { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); @@ -163,7 +163,7 @@ fn can_shutdown_with_zero_timeout_in_runtime() { #[test] fn can_shutdown_now_in_runtime() { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); @@ -179,7 +179,7 @@ fn can_shutdown_now_in_runtime() { #[test] fn coop_disabled_in_block_in_place() { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .enable_time() .build() @@ -213,7 +213,7 @@ fn coop_disabled_in_block_in_place_in_block_on() { let (done_tx, done_rx) = std::sync::mpsc::channel(); let done = done_tx.clone(); thread::spawn(move || { - let mut outer = tokio::runtime::Builder::new() + let outer = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs index bf80b8ee5f5..23e92586b78 100644 --- a/tokio/tests/task_local_set.rs +++ b/tokio/tests/task_local_set.rs @@ -133,12 +133,12 @@ fn local_threadpool_blocking_in_place() { ON_RT_THREAD.with(|cell| cell.set(true)); - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .threaded_scheduler() .enable_all() .build() .unwrap(); - LocalSet::new().block_on(&mut rt, async { + LocalSet::new().block_on(&rt, async { assert!(ON_RT_THREAD.with(|cell| cell.get())); let join = task::spawn_local(async move { assert!(ON_RT_THREAD.with(|cell| cell.get())); @@ -246,12 +246,12 @@ fn join_local_future_elsewhere() { ON_RT_THREAD.with(|cell| cell.set(true)); - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); let local = LocalSet::new(); - local.block_on(&mut rt, async move { + local.block_on(&rt, async move { let (tx, rx) = oneshot::channel(); let join = task::spawn_local(async move { println!("hello world running..."); @@ -286,7 +286,7 @@ fn drop_cancels_tasks() { use std::rc::Rc; // This test reproduces issue #1842 - let mut rt = rt(); + let rt = rt(); let rc1 = Rc::new(()); let rc2 = rc1.clone(); @@ -303,7 +303,7 @@ fn drop_cancels_tasks() { } }); - local.block_on(&mut rt, async { + local.block_on(&rt, async { started_rx.await.unwrap(); }); drop(local); @@ -362,11 +362,11 @@ fn drop_cancels_remote_tasks() { with_timeout(Duration::from_secs(60), || { let (tx, mut rx) = mpsc::channel::<()>(1024); - let mut rt = rt(); + let rt = rt(); let local = LocalSet::new(); local.spawn_local(async move { while rx.recv().await.is_some() {} }); - local.block_on(&mut rt, async { + local.block_on(&rt, async { time::delay_for(Duration::from_millis(1)).await; }); @@ -385,7 +385,7 @@ fn local_tasks_wake_join_all() { use futures::future::join_all; use tokio::task::LocalSet; - let mut rt = rt(); + let rt = rt(); let set = LocalSet::new(); let mut handles = Vec::new(); diff --git a/tokio/tests/time_rt.rs b/tokio/tests/time_rt.rs index b739f1b2f68..19bcd27d9b5 100644 --- a/tokio/tests/time_rt.rs +++ b/tokio/tests/time_rt.rs @@ -28,7 +28,7 @@ fn timer_with_threaded_runtime() { fn timer_with_basic_scheduler() { use tokio::runtime::Builder; - let mut rt = Builder::new() + let rt = Builder::new() .basic_scheduler() .enable_all() .build()