Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the loom mutex wrapper everywhere #3958

Merged
merged 2 commits into from Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions tokio/src/loom/std/atomic_u64.rs
Expand Up @@ -15,8 +15,8 @@ mod imp {

#[cfg(any(target_arch = "arm", target_arch = "mips", target_arch = "powerpc"))]
mod imp {
use crate::loom::sync::Mutex;
use std::sync::atomic::Ordering;
use std::sync::Mutex;

#[derive(Debug)]
pub(crate) struct AtomicU64 {
Expand All @@ -31,15 +31,15 @@ mod imp {
}

pub(crate) fn load(&self, _: Ordering) -> u64 {
*self.inner.lock().unwrap()
*self.inner.lock()
}

pub(crate) fn store(&self, val: u64, _: Ordering) {
*self.inner.lock().unwrap() = val;
*self.inner.lock() = val;
}

pub(crate) fn fetch_or(&self, val: u64, _: Ordering) -> u64 {
let mut lock = self.inner.lock().unwrap();
let mut lock = self.inner.lock();
let prev = *lock;
*lock = prev | val;
prev
Expand All @@ -52,7 +52,7 @@ mod imp {
_success: Ordering,
_failure: Ordering,
) -> Result<u64, u64> {
let mut lock = self.inner.lock().unwrap();
let mut lock = self.inner.lock();

if *lock == current {
*lock = new;
Expand Down
132 changes: 0 additions & 132 deletions tokio/src/runtime/shell.rs

This file was deleted.

7 changes: 3 additions & 4 deletions tokio/src/runtime/tests/loom_oneshot.rs
@@ -1,7 +1,6 @@
use crate::loom::sync::{Arc, Mutex};
use loom::sync::Notify;

use std::sync::{Arc, Mutex};

pub(crate) fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner {
notify: Notify::new(),
Expand Down Expand Up @@ -31,15 +30,15 @@ struct Inner<T> {

impl<T> Sender<T> {
pub(crate) fn send(self, value: T) {
*self.inner.value.lock().unwrap() = Some(value);
*self.inner.value.lock() = Some(value);
self.inner.notify.notify();
}
}

impl<T> Receiver<T> {
pub(crate) fn recv(self) -> T {
loop {
if let Some(v) = self.inner.value.lock().unwrap().take() {
if let Some(v) = self.inner.value.lock().take() {
return v;
}

Expand Down
5 changes: 2 additions & 3 deletions tokio/src/sync/barrier.rs
@@ -1,7 +1,6 @@
use crate::loom::sync::Mutex;
use crate::sync::watch;

use std::sync::Mutex;

/// A barrier enables multiple tasks to synchronize the beginning of some computation.
///
/// ```
Expand Down Expand Up @@ -94,7 +93,7 @@ impl Barrier {
// NOTE: the extra scope here is so that the compiler doesn't think `state` is held across
// a yield point, and thus marks the returned future as !Send.
let generation = {
let mut state = self.state.lock().unwrap();
let mut state = self.state.lock();
let generation = state.generation;
state.arrived += 1;
if state.arrived == self.n {
Expand Down
9 changes: 4 additions & 5 deletions tokio/src/task/local.rs
@@ -1,4 +1,5 @@
//! Runs `!Send` futures on the current thread.
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Task};
use crate::sync::AtomicWaker;

Expand All @@ -8,7 +9,6 @@ use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::Poll;

use pin_project_lite::pin_project;
Expand Down Expand Up @@ -537,7 +537,6 @@ impl LocalSet {
.shared
.queue
.lock()
.unwrap()
.pop_front()
.or_else(|| self.context.tasks.borrow_mut().queue.pop_front())
} else {
Expand All @@ -546,7 +545,7 @@ impl LocalSet {
.borrow_mut()
.queue
.pop_front()
.or_else(|| self.context.shared.queue.lock().unwrap().pop_front())
.or_else(|| self.context.shared.queue.lock().pop_front())
}
}

Expand Down Expand Up @@ -610,7 +609,7 @@ impl Drop for LocalSet {
task.shutdown();
}

for task in self.context.shared.queue.lock().unwrap().drain(..) {
for task in self.context.shared.queue.lock().drain(..) {
task.shutdown();
}

Expand Down Expand Up @@ -660,7 +659,7 @@ impl Shared {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
self.queue.lock().unwrap().push_back(task);
self.queue.lock().push_back(task);
self.waker.wake();
}
});
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/time/clock.rs
Expand Up @@ -29,7 +29,7 @@ cfg_not_test_util! {

cfg_test_util! {
use crate::time::{Duration, Instant};
use std::sync::{Arc, Mutex};
use crate::loom::sync::{Arc, Mutex};

cfg_rt! {
fn clock() -> Option<Clock> {
Expand Down Expand Up @@ -102,7 +102,7 @@ cfg_test_util! {
/// runtime.
pub fn resume() {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
let mut inner = clock.inner.lock().unwrap();
let mut inner = clock.inner.lock();

if inner.unfrozen.is_some() {
panic!("time is not frozen");
Expand Down Expand Up @@ -164,7 +164,7 @@ cfg_test_util! {
}

pub(crate) fn pause(&self) {
let mut inner = self.inner.lock().unwrap();
let mut inner = self.inner.lock();

if !inner.enable_pausing {
drop(inner); // avoid poisoning the lock
Expand All @@ -178,12 +178,12 @@ cfg_test_util! {
}

pub(crate) fn is_paused(&self) -> bool {
let inner = self.inner.lock().unwrap();
let inner = self.inner.lock();
inner.unfrozen.is_none()
}

pub(crate) fn advance(&self, duration: Duration) {
let mut inner = self.inner.lock().unwrap();
let mut inner = self.inner.lock();

if inner.unfrozen.is_some() {
panic!("time is not frozen");
Expand All @@ -193,7 +193,7 @@ cfg_test_util! {
}

pub(crate) fn now(&self) -> Instant {
let inner = self.inner.lock().unwrap();
let inner = self.inner.lock();

let mut ret = inner.base;

Expand Down