Skip to content

Commit

Permalink
chore: handle std Mutex poisoning in a shim (#2872)
Browse files Browse the repository at this point in the history
As tokio does not rely on poisoning, we can
avoid always unwrapping when locking by handling
the `PoisonError` in the Mutex shim.

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
  • Loading branch information
zaharidichev committed Sep 25, 2020
1 parent cf025ba commit 4446606
Show file tree
Hide file tree
Showing 17 changed files with 127 additions and 88 deletions.
10 changes: 5 additions & 5 deletions tokio/src/io/driver/scheduled_io.rs
Expand Up @@ -193,7 +193,7 @@ impl ScheduledIo {
}

pub(super) fn wake(&self, ready: mio::Ready) {
let mut waiters = self.waiters.lock().unwrap();
let mut waiters = self.waiters.lock();

// check for AsyncRead slot
if !(ready & (!mio::Ready::writable())).is_empty() {
Expand Down Expand Up @@ -241,7 +241,7 @@ impl ScheduledIo {

if ready.is_empty() {
// Update the task info
let mut waiters = self.waiters.lock().unwrap();
let mut waiters = self.waiters.lock();
let slot = match direction {
Direction::Read => &mut waiters.reader,
Direction::Write => &mut waiters.writer,
Expand Down Expand Up @@ -375,7 +375,7 @@ cfg_io_readiness! {
}

// Wasn't ready, take the lock (and check again while locked).
let mut waiters = scheduled_io.waiters.lock().unwrap();
let mut waiters = scheduled_io.waiters.lock();

let curr = scheduled_io.readiness.load(SeqCst);
let readiness = mio::Ready::from_usize(READINESS.unpack(curr));
Expand Down Expand Up @@ -408,7 +408,7 @@ cfg_io_readiness! {
// `notify.waiters`). In order to access the waker fields,
// we must hold the lock.

let waiters = scheduled_io.waiters.lock().unwrap();
let waiters = scheduled_io.waiters.lock();

// Safety: called while locked
let w = unsafe { &mut *waiter.get() };
Expand Down Expand Up @@ -450,7 +450,7 @@ cfg_io_readiness! {

impl Drop for Readiness<'_> {
fn drop(&mut self) {
let mut waiters = self.scheduled_io.waiters.lock().unwrap();
let mut waiters = self.scheduled_io.waiters.lock();

// Safety: `waiter` is only ever stored in `waiters`
unsafe {
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/io/util/mem.rs
Expand Up @@ -100,7 +100,7 @@ impl AsyncRead for DuplexStream {
cx: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut *self.read.lock().unwrap()).poll_read(cx, buf)
Pin::new(&mut *self.read.lock()).poll_read(cx, buf)
}
}

Expand All @@ -111,30 +111,30 @@ impl AsyncWrite for DuplexStream {
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut *self.write.lock().unwrap()).poll_write(cx, buf)
Pin::new(&mut *self.write.lock()).poll_write(cx, buf)
}

#[allow(unused_mut)]
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut *self.write.lock().unwrap()).poll_flush(cx)
Pin::new(&mut *self.write.lock()).poll_flush(cx)
}

#[allow(unused_mut)]
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut *self.write.lock().unwrap()).poll_shutdown(cx)
Pin::new(&mut *self.write.lock()).poll_shutdown(cx)
}
}

impl Drop for DuplexStream {
fn drop(&mut self) {
// notify the other side of the closure
self.write.lock().unwrap().close();
self.write.lock().close();
}
}

Expand Down
27 changes: 27 additions & 0 deletions tokio/src/loom/mocked.rs
@@ -1,5 +1,32 @@
pub(crate) use loom::*;

pub(crate) mod sync {

pub(crate) use loom::sync::MutexGuard;

#[derive(Debug)]
pub(crate) struct Mutex<T>(loom::sync::Mutex<T>);

#[allow(dead_code)]
impl<T> Mutex<T> {
#[inline]
pub(crate) fn new(t: T) -> Mutex<T> {
Mutex(loom::sync::Mutex::new(t))
}

#[inline]
pub(crate) fn lock(&self) -> MutexGuard<'_, T> {
self.0.lock().unwrap()
}

#[inline]
pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
self.0.try_lock().ok()
}
}
pub(crate) use loom::sync::*;
}

pub(crate) mod rand {
pub(crate) fn seed() -> u64 {
1
Expand Down
8 changes: 5 additions & 3 deletions tokio/src/loom/std/mod.rs
Expand Up @@ -6,6 +6,7 @@ mod atomic_u32;
mod atomic_u64;
mod atomic_u8;
mod atomic_usize;
mod mutex;
#[cfg(feature = "parking_lot")]
mod parking_lot;
mod unsafe_cell;
Expand Down Expand Up @@ -62,9 +63,10 @@ pub(crate) mod sync {

#[cfg(not(feature = "parking_lot"))]
#[allow(unused_imports)]
pub(crate) use std::sync::{
Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult,
};
pub(crate) use std::sync::{Condvar, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult};

#[cfg(not(feature = "parking_lot"))]
pub(crate) use crate::loom::std::mutex::Mutex;

pub(crate) mod atomic {
pub(crate) use crate::loom::std::atomic_ptr::AtomicPtr;
Expand Down
31 changes: 31 additions & 0 deletions tokio/src/loom/std/mutex.rs
@@ -0,0 +1,31 @@
use std::sync::{self, MutexGuard, TryLockError};

/// Adapter for `std::Mutex` that removes the poisoning aspects
// from its api
#[derive(Debug)]
pub(crate) struct Mutex<T: ?Sized>(sync::Mutex<T>);

#[allow(dead_code)]
impl<T> Mutex<T> {
#[inline]
pub(crate) fn new(t: T) -> Mutex<T> {
Mutex(sync::Mutex::new(t))
}

#[inline]
pub(crate) fn lock(&self) -> MutexGuard<'_, T> {
match self.0.lock() {
Ok(guard) => guard,
Err(p_err) => p_err.into_inner(),
}
}

#[inline]
pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
match self.0.try_lock() {
Ok(guard) => Some(guard),
Err(TryLockError::Poisoned(p_err)) => Some(p_err.into_inner()),
Err(TryLockError::WouldBlock) => None,
}
}
}
13 changes: 5 additions & 8 deletions tokio/src/loom/std/parking_lot.rs
Expand Up @@ -3,7 +3,7 @@
//!
//! This can be extended to additional types/methods as required.

use std::sync::{LockResult, TryLockError, TryLockResult};
use std::sync::LockResult;
use std::time::Duration;

// Types that do not need wrapping
Expand Down Expand Up @@ -34,16 +34,13 @@ impl<T> Mutex<T> {
}

#[inline]
pub(crate) fn lock(&self) -> LockResult<MutexGuard<'_, T>> {
Ok(self.0.lock())
pub(crate) fn lock(&self) -> MutexGuard<'_, T> {
self.0.lock()
}

#[inline]
pub(crate) fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>> {
match self.0.try_lock() {
Some(guard) => Ok(guard),
None => Err(TryLockError::WouldBlock),
}
pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
self.0.try_lock()
}

// Note: Additional methods `is_poisoned` and `into_inner`, can be
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/park/thread.rs
Expand Up @@ -87,7 +87,7 @@ impl Inner {
}

// Otherwise we need to coordinate going to sleep
let mut m = self.mutex.lock().unwrap();
let mut m = self.mutex.lock();

match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Expand Down Expand Up @@ -137,7 +137,7 @@ impl Inner {
return;
}

let m = self.mutex.lock().unwrap();
let m = self.mutex.lock();

match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Expand Down Expand Up @@ -188,7 +188,7 @@ impl Inner {
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
drop(self.mutex.lock().unwrap());
drop(self.mutex.lock());

self.condvar.notify_one()
}
Expand Down
25 changes: 7 additions & 18 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -10,7 +10,7 @@ use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::{Arc, PoisonError};
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
use std::time::Duration;

Expand Down Expand Up @@ -170,7 +170,7 @@ impl<P: Park> BasicScheduler<P> {
}

fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
let inner = self.inner.lock().unwrap().take()?;
let inner = self.inner.lock().take()?;

Some(InnerGuard {
inner: Some(inner),
Expand Down Expand Up @@ -280,12 +280,7 @@ impl<P: Park> Drop for BasicScheduler<P> {
// Avoid a double panic if we are currently panicking and
// the lock may be poisoned.

let mut inner = match self
.inner
.lock()
.unwrap_or_else(PoisonError::into_inner)
.take()
{
let mut inner = match self.inner.lock().take() {
Some(inner) => inner,
None if std::thread::panicking() => return,
None => panic!("Oh no! We never placed the Inner state back, this is a bug!"),
Expand All @@ -309,7 +304,7 @@ impl<P: Park> Drop for BasicScheduler<P> {
}

// Drain remote queue
for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) {
for task in scheduler.spawner.shared.queue.lock().drain(..) {
task.shutdown();
}

Expand Down Expand Up @@ -339,7 +334,7 @@ impl Spawner {
}

fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
self.shared.queue.lock().unwrap().pop_front()
self.shared.queue.lock().pop_front()
}

fn waker_ref(&self) -> WakerRef<'_> {
Expand Down Expand Up @@ -384,7 +379,7 @@ impl Schedule for Arc<Shared> {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
self.queue.lock().unwrap().push_back(task);
self.queue.lock().push_back(task);
self.unpark.unpark();
}
});
Expand Down Expand Up @@ -423,13 +418,7 @@ impl<P: Park> InnerGuard<'_, P> {
impl<P: Park> Drop for InnerGuard<'_, P> {
fn drop(&mut self) {
if let Some(scheduler) = self.inner.take() {
// We can ignore the poison error here since we are
// just replacing the state.
let mut lock = self
.basic_scheduler
.inner
.lock()
.unwrap_or_else(PoisonError::into_inner);
let mut lock = self.basic_scheduler.inner.lock();

// Replace old scheduler back into the state to allow
// other threads to pick it up and drive it.
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/runtime/blocking/pool.rs
Expand Up @@ -129,7 +129,7 @@ impl BlockingPool {
}

pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
let mut shared = self.spawner.inner.shared.lock().unwrap();
let mut shared = self.spawner.inner.shared.lock();

// The function can be called multiple times. First, by explicitly
// calling `shutdown` then by the drop handler calling `shutdown`. This
Expand Down Expand Up @@ -170,7 +170,7 @@ impl fmt::Debug for BlockingPool {
impl Spawner {
pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
let shutdown_tx = {
let mut shared = self.inner.shared.lock().unwrap();
let mut shared = self.inner.shared.lock();

if shared.shutdown {
// Shutdown the task
Expand Down Expand Up @@ -207,7 +207,7 @@ impl Spawner {
};

if let Some(shutdown_tx) = shutdown_tx {
let mut shared = self.inner.shared.lock().unwrap();
let mut shared = self.inner.shared.lock();
let entry = shared.worker_threads.vacant_entry();

let handle = self.spawn_thread(shutdown_tx, rt, entry.key());
Expand Down Expand Up @@ -251,15 +251,15 @@ impl Inner {
f()
}

let mut shared = self.shared.lock().unwrap();
let mut shared = self.shared.lock();

'main: loop {
// BUSY
while let Some(task) = shared.queue.pop_front() {
drop(shared);
task.run();

shared = self.shared.lock().unwrap();
shared = self.shared.lock();
}

// IDLE
Expand Down Expand Up @@ -296,7 +296,7 @@ impl Inner {
drop(shared);
task.shutdown();

shared = self.shared.lock().unwrap();
shared = self.shared.lock();
}

// Work was produced, and we "took" it (by decrementing num_notify).
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/park.rs
Expand Up @@ -142,7 +142,7 @@ impl Inner {

fn park_condvar(&self) {
// Otherwise we need to coordinate going to sleep
let mut m = self.mutex.lock().unwrap();
let mut m = self.mutex.lock();

match self
.state
Expand Down Expand Up @@ -238,7 +238,7 @@ impl Inner {
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
drop(self.mutex.lock().unwrap());
drop(self.mutex.lock());

self.condvar.notify_one()
}
Expand Down

0 comments on commit 4446606

Please sign in to comment.