diff --git a/crossbeam-channel/src/flavors/zero.rs b/crossbeam-channel/src/flavors/zero.rs index 2208d12c1..31e62afac 100644 --- a/crossbeam-channel/src/flavors/zero.rs +++ b/crossbeam-channel/src/flavors/zero.rs @@ -5,6 +5,7 @@ use std::cell::UnsafeCell; use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; use std::time::Instant; use std::{fmt, ptr}; @@ -13,7 +14,6 @@ use crossbeam_utils::Backoff; use crate::context::Context; use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; use crate::select::{Operation, SelectHandle, Selected, Token}; -use crate::utils::Spinlock; use crate::waker::Waker; /// A pointer to a packet. @@ -95,7 +95,7 @@ struct Inner { /// Zero-capacity channel. pub(crate) struct Channel { /// Inner representation of the channel. - inner: Spinlock, + inner: Mutex, /// Indicates that dropping a `Channel` may drop values of type `T`. _marker: PhantomData, @@ -105,7 +105,7 @@ impl Channel { /// Constructs a new zero-capacity channel. pub(crate) fn new() -> Self { Channel { - inner: Spinlock::new(Inner { + inner: Mutex::new(Inner { senders: Waker::new(), receivers: Waker::new(), is_disconnected: false, @@ -126,7 +126,7 @@ impl Channel { /// Attempts to reserve a slot for sending a message. fn start_send(&self, token: &mut Token) -> bool { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { @@ -155,7 +155,7 @@ impl Channel { /// Attempts to pair up with a sender. fn start_recv(&self, token: &mut Token) -> bool { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { @@ -198,7 +198,7 @@ impl Channel { /// Attempts to send a message into the channel. pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError> { let token = &mut Token::default(); - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { @@ -222,7 +222,7 @@ impl Channel { deadline: Option, ) -> Result<(), SendTimeoutError> { let token = &mut Token::default(); - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting receiver, pair up with it. if let Some(operation) = inner.receivers.try_select() { @@ -254,12 +254,12 @@ impl Channel { match sel { Selected::Waiting => unreachable!(), Selected::Aborted => { - self.inner.lock().senders.unregister(oper).unwrap(); + self.inner.lock().unwrap().senders.unregister(oper).unwrap(); let msg = unsafe { packet.msg.get().replace(None).unwrap() }; Err(SendTimeoutError::Timeout(msg)) } Selected::Disconnected => { - self.inner.lock().senders.unregister(oper).unwrap(); + self.inner.lock().unwrap().senders.unregister(oper).unwrap(); let msg = unsafe { packet.msg.get().replace(None).unwrap() }; Err(SendTimeoutError::Disconnected(msg)) } @@ -275,7 +275,7 @@ impl Channel { /// Attempts to receive a message without blocking. pub(crate) fn try_recv(&self) -> Result { let token = &mut Token::default(); - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { @@ -292,7 +292,7 @@ impl Channel { /// Receives a message from the channel. pub(crate) fn recv(&self, deadline: Option) -> Result { let token = &mut Token::default(); - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); // If there's a waiting sender, pair up with it. if let Some(operation) = inner.senders.try_select() { @@ -325,11 +325,21 @@ impl Channel { match sel { Selected::Waiting => unreachable!(), Selected::Aborted => { - self.inner.lock().receivers.unregister(oper).unwrap(); + self.inner + .lock() + .unwrap() + .receivers + .unregister(oper) + .unwrap(); Err(RecvTimeoutError::Timeout) } Selected::Disconnected => { - self.inner.lock().receivers.unregister(oper).unwrap(); + self.inner + .lock() + .unwrap() + .receivers + .unregister(oper) + .unwrap(); Err(RecvTimeoutError::Disconnected) } Selected::Operation(_) => { @@ -345,7 +355,7 @@ impl Channel { /// /// Returns `true` if this call disconnected the channel. pub(crate) fn disconnect(&self) -> bool { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); if !inner.is_disconnected { inner.is_disconnected = true; @@ -396,7 +406,7 @@ impl SelectHandle for Receiver<'_, T> { fn register(&self, oper: Operation, cx: &Context) -> bool { let packet = Box::into_raw(Packet::::empty_on_heap()); - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner .receivers .register_with_packet(oper, packet as *mut (), cx); @@ -405,7 +415,7 @@ impl SelectHandle for Receiver<'_, T> { } fn unregister(&self, oper: Operation) { - if let Some(operation) = self.0.inner.lock().receivers.unregister(oper) { + if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) { unsafe { drop(Box::from_raw(operation.packet as *mut Packet)); } @@ -418,18 +428,18 @@ impl SelectHandle for Receiver<'_, T> { } fn is_ready(&self) -> bool { - let inner = self.0.inner.lock(); + let inner = self.0.inner.lock().unwrap(); inner.senders.can_select() || inner.is_disconnected } fn watch(&self, oper: Operation, cx: &Context) -> bool { - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner.receivers.watch(oper, cx); inner.senders.can_select() || inner.is_disconnected } fn unwatch(&self, oper: Operation) { - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner.receivers.unwatch(oper); } } @@ -446,7 +456,7 @@ impl SelectHandle for Sender<'_, T> { fn register(&self, oper: Operation, cx: &Context) -> bool { let packet = Box::into_raw(Packet::::empty_on_heap()); - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner .senders .register_with_packet(oper, packet as *mut (), cx); @@ -455,7 +465,7 @@ impl SelectHandle for Sender<'_, T> { } fn unregister(&self, oper: Operation) { - if let Some(operation) = self.0.inner.lock().senders.unregister(oper) { + if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) { unsafe { drop(Box::from_raw(operation.packet as *mut Packet)); } @@ -468,18 +478,18 @@ impl SelectHandle for Sender<'_, T> { } fn is_ready(&self) -> bool { - let inner = self.0.inner.lock(); + let inner = self.0.inner.lock().unwrap(); inner.receivers.can_select() || inner.is_disconnected } fn watch(&self, oper: Operation, cx: &Context) -> bool { - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner.senders.watch(oper, cx); inner.receivers.can_select() || inner.is_disconnected } fn unwatch(&self, oper: Operation) { - let mut inner = self.0.inner.lock(); + let mut inner = self.0.inner.lock().unwrap(); inner.senders.unwatch(oper); } } diff --git a/crossbeam-channel/src/utils.rs b/crossbeam-channel/src/utils.rs index d87c2408c..9f14c8e65 100644 --- a/crossbeam-channel/src/utils.rs +++ b/crossbeam-channel/src/utils.rs @@ -1,14 +1,10 @@ //! Miscellaneous utilities. -use std::cell::{Cell, UnsafeCell}; +use std::cell::Cell; use std::num::Wrapping; -use std::ops::{Deref, DerefMut}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::{Duration, Instant}; -use crossbeam_utils::Backoff; - /// Randomly shuffles a slice. pub(crate) fn shuffle(v: &mut [T]) { let len = v.len(); @@ -68,53 +64,3 @@ pub(crate) fn convert_timeout_to_deadline(timeout: Duration) -> Instant { None => Instant::now() + Duration::from_secs(86400 * 365 * 30), } } - -/// A simple spinlock. -pub(crate) struct Spinlock { - flag: AtomicBool, - value: UnsafeCell, -} - -impl Spinlock { - /// Returns a new spinlock initialized with `value`. - pub(crate) fn new(value: T) -> Spinlock { - Spinlock { - flag: AtomicBool::new(false), - value: UnsafeCell::new(value), - } - } - - /// Locks the spinlock. - pub(crate) fn lock(&self) -> SpinlockGuard<'_, T> { - let backoff = Backoff::new(); - while self.flag.swap(true, Ordering::Acquire) { - backoff.snooze(); - } - SpinlockGuard { parent: self } - } -} - -/// A guard holding a spinlock locked. -pub(crate) struct SpinlockGuard<'a, T> { - parent: &'a Spinlock, -} - -impl Drop for SpinlockGuard<'_, T> { - fn drop(&mut self) { - self.parent.flag.store(false, Ordering::Release); - } -} - -impl Deref for SpinlockGuard<'_, T> { - type Target = T; - - fn deref(&self) -> &T { - unsafe { &*self.parent.value.get() } - } -} - -impl DerefMut for SpinlockGuard<'_, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.parent.value.get() } - } -} diff --git a/crossbeam-channel/src/waker.rs b/crossbeam-channel/src/waker.rs index ee445ec8c..7eb58ba7f 100644 --- a/crossbeam-channel/src/waker.rs +++ b/crossbeam-channel/src/waker.rs @@ -2,11 +2,11 @@ use std::ptr; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; use std::thread::{self, ThreadId}; use crate::context::Context; use crate::select::{Operation, Selected}; -use crate::utils::Spinlock; /// Represents a thread blocked on a specific channel operation. pub(crate) struct Entry { @@ -176,7 +176,7 @@ impl Drop for Waker { /// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization. pub(crate) struct SyncWaker { /// The inner `Waker`. - inner: Spinlock, + inner: Mutex, /// `true` if the waker is empty. is_empty: AtomicBool, @@ -187,7 +187,7 @@ impl SyncWaker { #[inline] pub(crate) fn new() -> Self { SyncWaker { - inner: Spinlock::new(Waker::new()), + inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true), } } @@ -195,7 +195,7 @@ impl SyncWaker { /// Registers the current thread with an operation. #[inline] pub(crate) fn register(&self, oper: Operation, cx: &Context) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); inner.register(oper, cx); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(), @@ -206,7 +206,7 @@ impl SyncWaker { /// Unregisters an operation previously registered by the current thread. #[inline] pub(crate) fn unregister(&self, oper: Operation) -> Option { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); let entry = inner.unregister(oper); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(), @@ -219,7 +219,7 @@ impl SyncWaker { #[inline] pub(crate) fn notify(&self) { if !self.is_empty.load(Ordering::SeqCst) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); if !self.is_empty.load(Ordering::SeqCst) { inner.try_select(); inner.notify(); @@ -234,7 +234,7 @@ impl SyncWaker { /// Registers an operation waiting to be ready. #[inline] pub(crate) fn watch(&self, oper: Operation, cx: &Context) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); inner.watch(oper, cx); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(), @@ -245,7 +245,7 @@ impl SyncWaker { /// Unregisters an operation waiting to be ready. #[inline] pub(crate) fn unwatch(&self, oper: Operation) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); inner.unwatch(oper); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(), @@ -256,7 +256,7 @@ impl SyncWaker { /// Notifies all threads that the channel is disconnected. #[inline] pub(crate) fn disconnect(&self) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().unwrap(); inner.disconnect(); self.is_empty.store( inner.selectors.is_empty() && inner.observers.is_empty(),