From 0e2a31f9c70c57fd0e7fa7db72cccc6306f3a2ea Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Wed, 14 Jul 2021 11:06:44 +0200 Subject: [PATCH 1/2] chore: use the loom mutex wrapper --- tokio/src/loom/std/atomic_u64.rs | 10 +- tokio/src/runtime/shell.rs | 132 ------------------------ tokio/src/runtime/tests/loom_oneshot.rs | 7 +- tokio/src/sync/barrier.rs | 5 +- tokio/src/task/local.rs | 9 +- tokio/src/time/clock.rs | 12 +-- 6 files changed, 20 insertions(+), 155 deletions(-) delete mode 100644 tokio/src/runtime/shell.rs diff --git a/tokio/src/loom/std/atomic_u64.rs b/tokio/src/loom/std/atomic_u64.rs index a86a195b1d2..2d7f6ff0449 100644 --- a/tokio/src/loom/std/atomic_u64.rs +++ b/tokio/src/loom/std/atomic_u64.rs @@ -16,7 +16,7 @@ mod imp { #[cfg(any(target_arch = "arm", target_arch = "mips", target_arch = "powerpc"))] mod imp { use std::sync::atomic::Ordering; - use std::sync::Mutex; + use crate::loom::sync::Mutex; #[derive(Debug)] pub(crate) struct AtomicU64 { @@ -31,15 +31,15 @@ mod imp { } pub(crate) fn load(&self, _: Ordering) -> u64 { - *self.inner.lock().unwrap() + *self.inner.lock() } pub(crate) fn store(&self, val: u64, _: Ordering) { - *self.inner.lock().unwrap() = val; + *self.inner.lock() = val; } pub(crate) fn fetch_or(&self, val: u64, _: Ordering) -> u64 { - let mut lock = self.inner.lock().unwrap(); + let mut lock = self.inner.lock(); let prev = *lock; *lock = prev | val; prev @@ -52,7 +52,7 @@ mod imp { _success: Ordering, _failure: Ordering, ) -> Result { - let mut lock = self.inner.lock().unwrap(); + let mut lock = self.inner.lock(); if *lock == current { *lock = new; diff --git a/tokio/src/runtime/shell.rs b/tokio/src/runtime/shell.rs deleted file mode 100644 index 486d4fa5bbe..00000000000 --- a/tokio/src/runtime/shell.rs +++ /dev/null @@ -1,132 +0,0 @@ -#![allow(clippy::redundant_clone)] - -use crate::future::poll_fn; -use crate::park::{Park, Unpark}; -use crate::runtime::driver::Driver; -use crate::sync::Notify; -use crate::util::{waker_ref, Wake}; - -use std::sync::{Arc, Mutex}; -use std::task::Context; -use std::task::Poll::{Pending, Ready}; -use std::{future::Future, sync::PoisonError}; - -#[derive(Debug)] -pub(super) struct Shell { - driver: Mutex>, - - notify: Notify, - - /// TODO: don't store this - unpark: Arc, -} - -#[derive(Debug)] -struct Handle(::Unpark); - -impl Shell { - pub(super) fn new(driver: Driver) -> Shell { - let unpark = Arc::new(Handle(driver.unpark())); - - Shell { - driver: Mutex::new(Some(driver)), - notify: Notify::new(), - unpark, - } - } - - pub(super) fn block_on(&self, f: F) -> F::Output - where - F: Future, - { - let mut enter = crate::runtime::enter(true); - - pin!(f); - - loop { - if let Some(driver) = &mut self.take_driver() { - return driver.block_on(f); - } else { - let notified = self.notify.notified(); - pin!(notified); - - if let Some(out) = enter - .block_on(poll_fn(|cx| { - if notified.as_mut().poll(cx).is_ready() { - return Ready(None); - } - - if let Ready(out) = f.as_mut().poll(cx) { - return Ready(Some(out)); - } - - Pending - })) - .expect("Failed to `Enter::block_on`") - { - return out; - } - } - } - } - - fn take_driver(&self) -> Option> { - let mut lock = self.driver.lock().unwrap(); - let driver = lock.take()?; - - Some(DriverGuard { - inner: Some(driver), - shell: &self, - }) - } -} - -impl Wake for Handle { - /// Wake by value - fn wake(self: Arc) { - Wake::wake_by_ref(&self); - } - - /// Wake by reference - fn wake_by_ref(arc_self: &Arc) { - arc_self.0.unpark(); - } -} - -struct DriverGuard<'a> { - inner: Option, - shell: &'a Shell, -} - -impl DriverGuard<'_> { - fn block_on(&mut self, f: F) -> F::Output { - let driver = self.inner.as_mut().unwrap(); - - pin!(f); - - let waker = waker_ref(&self.shell.unpark); - let mut cx = Context::from_waker(&waker); - - loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { - return v; - } - - driver.park().unwrap(); - } - } -} - -impl Drop for DriverGuard<'_> { - fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - self.shell - .driver - .lock() - .unwrap_or_else(PoisonError::into_inner) - .replace(inner); - - self.shell.notify.notify_one(); - } - } -} diff --git a/tokio/src/runtime/tests/loom_oneshot.rs b/tokio/src/runtime/tests/loom_oneshot.rs index c126fe479af..ca4f472afb4 100644 --- a/tokio/src/runtime/tests/loom_oneshot.rs +++ b/tokio/src/runtime/tests/loom_oneshot.rs @@ -1,6 +1,5 @@ use loom::sync::Notify; - -use std::sync::{Arc, Mutex}; +use crate::loom::sync::{Arc, Mutex}; pub(crate) fn channel() -> (Sender, Receiver) { let inner = Arc::new(Inner { @@ -31,7 +30,7 @@ struct Inner { impl Sender { pub(crate) fn send(self, value: T) { - *self.inner.value.lock().unwrap() = Some(value); + *self.inner.value.lock() = Some(value); self.inner.notify.notify(); } } @@ -39,7 +38,7 @@ impl Sender { impl Receiver { pub(crate) fn recv(self) -> T { loop { - if let Some(v) = self.inner.value.lock().unwrap().take() { + if let Some(v) = self.inner.value.lock().take() { return v; } diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs index e3c95f6a6b6..7376d5f9332 100644 --- a/tokio/src/sync/barrier.rs +++ b/tokio/src/sync/barrier.rs @@ -1,6 +1,5 @@ use crate::sync::watch; - -use std::sync::Mutex; +use crate::loom::sync::Mutex; /// A barrier enables multiple tasks to synchronize the beginning of some computation. /// @@ -94,7 +93,7 @@ impl Barrier { // NOTE: the extra scope here is so that the compiler doesn't think `state` is held across // a yield point, and thus marks the returned future as !Send. let generation = { - let mut state = self.state.lock().unwrap(); + let mut state = self.state.lock(); let generation = state.generation; state.arrived += 1; if state.arrived == self.n { diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 01af63fc9c0..d8f7d800638 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -1,6 +1,7 @@ //! Runs `!Send` futures on the current thread. use crate::runtime::task::{self, JoinHandle, OwnedTasks, Task}; use crate::sync::AtomicWaker; +use crate::loom::sync::{Arc, Mutex}; use std::cell::{Cell, RefCell}; use std::collections::VecDeque; @@ -8,7 +9,6 @@ use std::fmt; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; -use std::sync::{Arc, Mutex}; use std::task::Poll; use pin_project_lite::pin_project; @@ -537,7 +537,6 @@ impl LocalSet { .shared .queue .lock() - .unwrap() .pop_front() .or_else(|| self.context.tasks.borrow_mut().queue.pop_front()) } else { @@ -546,7 +545,7 @@ impl LocalSet { .borrow_mut() .queue .pop_front() - .or_else(|| self.context.shared.queue.lock().unwrap().pop_front()) + .or_else(|| self.context.shared.queue.lock().pop_front()) } } @@ -610,7 +609,7 @@ impl Drop for LocalSet { task.shutdown(); } - for task in self.context.shared.queue.lock().unwrap().drain(..) { + for task in self.context.shared.queue.lock().drain(..) { task.shutdown(); } @@ -660,7 +659,7 @@ impl Shared { cx.tasks.borrow_mut().queue.push_back(task); } _ => { - self.queue.lock().unwrap().push_back(task); + self.queue.lock().push_back(task); self.waker.wake(); } }); diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index b9ec5c5aab3..a44d75f3ce1 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -29,7 +29,7 @@ cfg_not_test_util! { cfg_test_util! { use crate::time::{Duration, Instant}; - use std::sync::{Arc, Mutex}; + use crate::loom::sync::{Arc, Mutex}; cfg_rt! { fn clock() -> Option { @@ -102,7 +102,7 @@ cfg_test_util! { /// runtime. pub fn resume() { let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); - let mut inner = clock.inner.lock().unwrap(); + let mut inner = clock.inner.lock(); if inner.unfrozen.is_some() { panic!("time is not frozen"); @@ -164,7 +164,7 @@ cfg_test_util! { } pub(crate) fn pause(&self) { - let mut inner = self.inner.lock().unwrap(); + let mut inner = self.inner.lock(); if !inner.enable_pausing { drop(inner); // avoid poisoning the lock @@ -178,12 +178,12 @@ cfg_test_util! { } pub(crate) fn is_paused(&self) -> bool { - let inner = self.inner.lock().unwrap(); + let inner = self.inner.lock(); inner.unfrozen.is_none() } pub(crate) fn advance(&self, duration: Duration) { - let mut inner = self.inner.lock().unwrap(); + let mut inner = self.inner.lock(); if inner.unfrozen.is_some() { panic!("time is not frozen"); @@ -193,7 +193,7 @@ cfg_test_util! { } pub(crate) fn now(&self) -> Instant { - let inner = self.inner.lock().unwrap(); + let inner = self.inner.lock(); let mut ret = inner.base; From 0b61b9016d5379f8d351a01359a482cde5857173 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Wed, 14 Jul 2021 11:11:40 +0200 Subject: [PATCH 2/2] rustfmt --- tokio/src/loom/std/atomic_u64.rs | 2 +- tokio/src/runtime/tests/loom_oneshot.rs | 2 +- tokio/src/sync/barrier.rs | 2 +- tokio/src/task/local.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/loom/std/atomic_u64.rs b/tokio/src/loom/std/atomic_u64.rs index 2d7f6ff0449..7eb457a2405 100644 --- a/tokio/src/loom/std/atomic_u64.rs +++ b/tokio/src/loom/std/atomic_u64.rs @@ -15,8 +15,8 @@ mod imp { #[cfg(any(target_arch = "arm", target_arch = "mips", target_arch = "powerpc"))] mod imp { - use std::sync::atomic::Ordering; use crate::loom::sync::Mutex; + use std::sync::atomic::Ordering; #[derive(Debug)] pub(crate) struct AtomicU64 { diff --git a/tokio/src/runtime/tests/loom_oneshot.rs b/tokio/src/runtime/tests/loom_oneshot.rs index ca4f472afb4..87eb6386425 100644 --- a/tokio/src/runtime/tests/loom_oneshot.rs +++ b/tokio/src/runtime/tests/loom_oneshot.rs @@ -1,5 +1,5 @@ -use loom::sync::Notify; use crate::loom::sync::{Arc, Mutex}; +use loom::sync::Notify; pub(crate) fn channel() -> (Sender, Receiver) { let inner = Arc::new(Inner { diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs index 7376d5f9332..0e39dac8bb5 100644 --- a/tokio/src/sync/barrier.rs +++ b/tokio/src/sync/barrier.rs @@ -1,5 +1,5 @@ -use crate::sync::watch; use crate::loom::sync::Mutex; +use crate::sync::watch; /// A barrier enables multiple tasks to synchronize the beginning of some computation. /// diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index d8f7d800638..7404cc2c19b 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -1,7 +1,7 @@ //! Runs `!Send` futures on the current thread. +use crate::loom::sync::{Arc, Mutex}; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Task}; use crate::sync::AtomicWaker; -use crate::loom::sync::{Arc, Mutex}; use std::cell::{Cell, RefCell}; use std::collections::VecDeque;