Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: handle std Mutex poisoning in a shim #2872

Merged
merged 4 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions tokio/src/io/driver/scheduled_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl ScheduledIo {
}

pub(super) fn wake(&self, ready: mio::Ready) {
let mut waiters = self.waiters.lock().unwrap();
let mut waiters = self.waiters.lock();

// check for AsyncRead slot
if !(ready & (!mio::Ready::writable())).is_empty() {
Expand Down Expand Up @@ -241,7 +241,7 @@ impl ScheduledIo {

if ready.is_empty() {
// Update the task info
let mut waiters = self.waiters.lock().unwrap();
let mut waiters = self.waiters.lock();
let slot = match direction {
Direction::Read => &mut waiters.reader,
Direction::Write => &mut waiters.writer,
Expand Down Expand Up @@ -375,7 +375,7 @@ cfg_io_readiness! {
}

// Wasn't ready, take the lock (and check again while locked).
let mut waiters = scheduled_io.waiters.lock().unwrap();
let mut waiters = scheduled_io.waiters.lock();

let curr = scheduled_io.readiness.load(SeqCst);
let readiness = mio::Ready::from_usize(READINESS.unpack(curr));
Expand Down Expand Up @@ -408,7 +408,7 @@ cfg_io_readiness! {
// `notify.waiters`). In order to access the waker fields,
// we must hold the lock.

let waiters = scheduled_io.waiters.lock().unwrap();
let waiters = scheduled_io.waiters.lock();

// Safety: called while locked
let w = unsafe { &mut *waiter.get() };
Expand Down Expand Up @@ -450,7 +450,7 @@ cfg_io_readiness! {

impl Drop for Readiness<'_> {
fn drop(&mut self) {
let mut waiters = self.scheduled_io.waiters.lock().unwrap();
let mut waiters = self.scheduled_io.waiters.lock();

// Safety: `waiter` is only ever stored in `waiters`
unsafe {
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/io/util/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl AsyncRead for DuplexStream {
cx: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut *self.read.lock().unwrap()).poll_read(cx, buf)
Pin::new(&mut *self.read.lock()).poll_read(cx, buf)
}
}

Expand All @@ -111,30 +111,30 @@ impl AsyncWrite for DuplexStream {
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut *self.write.lock().unwrap()).poll_write(cx, buf)
Pin::new(&mut *self.write.lock()).poll_write(cx, buf)
}

#[allow(unused_mut)]
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut *self.write.lock().unwrap()).poll_flush(cx)
Pin::new(&mut *self.write.lock()).poll_flush(cx)
}

#[allow(unused_mut)]
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut *self.write.lock().unwrap()).poll_shutdown(cx)
Pin::new(&mut *self.write.lock()).poll_shutdown(cx)
}
}

impl Drop for DuplexStream {
fn drop(&mut self) {
// notify the other side of the closure
self.write.lock().unwrap().close();
self.write.lock().close();
}
}

Expand Down
27 changes: 27 additions & 0 deletions tokio/src/loom/mocked.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,32 @@
pub(crate) use loom::*;

pub(crate) mod sync {

pub(crate) use loom::sync::MutexGuard;

#[derive(Debug)]
pub(crate) struct Mutex<T>(loom::sync::Mutex<T>);

#[allow(dead_code)]
impl<T> Mutex<T> {
#[inline]
pub(crate) fn new(t: T) -> Mutex<T> {
Mutex(loom::sync::Mutex::new(t))
}

#[inline]
pub(crate) fn lock(&self) -> MutexGuard<'_, T> {
self.0.lock().unwrap()
}
Comment on lines +17 to +20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't really seem correct? Shouldn't it use into_inner? Or maybe it's good to panic on poisoning in loom tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Poisoning in loom is good because we should never hit it.


#[inline]
pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
self.0.try_lock().ok()
}
}
pub(crate) use loom::sync::*;
}

pub(crate) mod rand {
pub(crate) fn seed() -> u64 {
1
Expand Down
8 changes: 5 additions & 3 deletions tokio/src/loom/std/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod atomic_u32;
mod atomic_u64;
mod atomic_u8;
mod atomic_usize;
mod mutex;
#[cfg(feature = "parking_lot")]
mod parking_lot;
mod unsafe_cell;
Expand Down Expand Up @@ -62,9 +63,10 @@ pub(crate) mod sync {

#[cfg(not(feature = "parking_lot"))]
#[allow(unused_imports)]
pub(crate) use std::sync::{
Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult,
};
pub(crate) use std::sync::{Condvar, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult};

#[cfg(not(feature = "parking_lot"))]
pub(crate) use crate::loom::std::mutex::Mutex;

pub(crate) mod atomic {
pub(crate) use crate::loom::std::atomic_ptr::AtomicPtr;
Expand Down
31 changes: 31 additions & 0 deletions tokio/src/loom/std/mutex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::sync::{self, MutexGuard, TryLockError};

/// Adapter for `std::Mutex` that removes the poisoning aspects
// from its api
#[derive(Debug)]
pub(crate) struct Mutex<T: ?Sized>(sync::Mutex<T>);

#[allow(dead_code)]
impl<T> Mutex<T> {
#[inline]
pub(crate) fn new(t: T) -> Mutex<T> {
Mutex(sync::Mutex::new(t))
}

#[inline]
pub(crate) fn lock(&self) -> MutexGuard<'_, T> {
match self.0.lock() {
Ok(guard) => guard,
Err(p_err) => p_err.into_inner(),
}
}

#[inline]
pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
match self.0.try_lock() {
Ok(guard) => Some(guard),
Err(TryLockError::Poisoned(p_err)) => Some(p_err.into_inner()),
Err(TryLockError::WouldBlock) => None,
}
}
}
13 changes: 5 additions & 8 deletions tokio/src/loom/std/parking_lot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//!
//! This can be extended to additional types/methods as required.

use std::sync::{LockResult, TryLockError, TryLockResult};
use std::sync::LockResult;
use std::time::Duration;

// Types that do not need wrapping
Expand Down Expand Up @@ -34,16 +34,13 @@ impl<T> Mutex<T> {
}

#[inline]
pub(crate) fn lock(&self) -> LockResult<MutexGuard<'_, T>> {
Ok(self.0.lock())
pub(crate) fn lock(&self) -> MutexGuard<'_, T> {
self.0.lock()
}

#[inline]
pub(crate) fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>> {
match self.0.try_lock() {
Some(guard) => Ok(guard),
None => Err(TryLockError::WouldBlock),
}
pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
self.0.try_lock()
}

// Note: Additional methods `is_poisoned` and `into_inner`, can be
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/park/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Inner {
}

// Otherwise we need to coordinate going to sleep
let mut m = self.mutex.lock().unwrap();
let mut m = self.mutex.lock();

match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Expand Down Expand Up @@ -137,7 +137,7 @@ impl Inner {
return;
}

let m = self.mutex.lock().unwrap();
let m = self.mutex.lock();

match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Expand Down Expand Up @@ -188,7 +188,7 @@ impl Inner {
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
drop(self.mutex.lock().unwrap());
drop(self.mutex.lock());

self.condvar.notify_one()
}
Expand Down
25 changes: 7 additions & 18 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::{Arc, PoisonError};
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
use std::time::Duration;

Expand Down Expand Up @@ -170,7 +170,7 @@ impl<P: Park> BasicScheduler<P> {
}

fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
let inner = self.inner.lock().unwrap().take()?;
let inner = self.inner.lock().take()?;

Some(InnerGuard {
inner: Some(inner),
Expand Down Expand Up @@ -280,12 +280,7 @@ impl<P: Park> Drop for BasicScheduler<P> {
// Avoid a double panic if we are currently panicking and
// the lock may be poisoned.

let mut inner = match self
.inner
.lock()
.unwrap_or_else(PoisonError::into_inner)
.take()
{
let mut inner = match self.inner.lock().take() {
Some(inner) => inner,
None if std::thread::panicking() => return,
None => panic!("Oh no! We never placed the Inner state back, this is a bug!"),
Expand All @@ -309,7 +304,7 @@ impl<P: Park> Drop for BasicScheduler<P> {
}

// Drain remote queue
for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) {
for task in scheduler.spawner.shared.queue.lock().drain(..) {
task.shutdown();
}

Expand Down Expand Up @@ -339,7 +334,7 @@ impl Spawner {
}

fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
self.shared.queue.lock().unwrap().pop_front()
self.shared.queue.lock().pop_front()
}

fn waker_ref(&self) -> WakerRef<'_> {
Expand Down Expand Up @@ -384,7 +379,7 @@ impl Schedule for Arc<Shared> {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
self.queue.lock().unwrap().push_back(task);
self.queue.lock().push_back(task);
self.unpark.unpark();
}
});
Expand Down Expand Up @@ -423,13 +418,7 @@ impl<P: Park> InnerGuard<'_, P> {
impl<P: Park> Drop for InnerGuard<'_, P> {
fn drop(&mut self) {
if let Some(scheduler) = self.inner.take() {
// We can ignore the poison error here since we are
// just replacing the state.
let mut lock = self
.basic_scheduler
.inner
.lock()
.unwrap_or_else(PoisonError::into_inner);
let mut lock = self.basic_scheduler.inner.lock();

// Replace old scheduler back into the state to allow
// other threads to pick it up and drive it.
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl BlockingPool {
}

pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
let mut shared = self.spawner.inner.shared.lock().unwrap();
let mut shared = self.spawner.inner.shared.lock();

// The function can be called multiple times. First, by explicitly
// calling `shutdown` then by the drop handler calling `shutdown`. This
Expand Down Expand Up @@ -170,7 +170,7 @@ impl fmt::Debug for BlockingPool {
impl Spawner {
pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
let shutdown_tx = {
let mut shared = self.inner.shared.lock().unwrap();
let mut shared = self.inner.shared.lock();

if shared.shutdown {
// Shutdown the task
Expand Down Expand Up @@ -207,7 +207,7 @@ impl Spawner {
};

if let Some(shutdown_tx) = shutdown_tx {
let mut shared = self.inner.shared.lock().unwrap();
let mut shared = self.inner.shared.lock();
let entry = shared.worker_threads.vacant_entry();

let handle = self.spawn_thread(shutdown_tx, rt, entry.key());
Expand Down Expand Up @@ -251,15 +251,15 @@ impl Inner {
f()
}

let mut shared = self.shared.lock().unwrap();
let mut shared = self.shared.lock();

'main: loop {
// BUSY
while let Some(task) = shared.queue.pop_front() {
drop(shared);
task.run();

shared = self.shared.lock().unwrap();
shared = self.shared.lock();
}

// IDLE
Expand Down Expand Up @@ -296,7 +296,7 @@ impl Inner {
drop(shared);
task.shutdown();

shared = self.shared.lock().unwrap();
shared = self.shared.lock();
}

// Work was produced, and we "took" it (by decrementing num_notify).
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl Inner {

fn park_condvar(&self) {
// Otherwise we need to coordinate going to sleep
let mut m = self.mutex.lock().unwrap();
let mut m = self.mutex.lock();

match self
.state
Expand Down Expand Up @@ -238,7 +238,7 @@ impl Inner {
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
drop(self.mutex.lock().unwrap());
drop(self.mutex.lock());

self.condvar.notify_one()
}
Expand Down