Skip to content

Commit

Permalink
Track closed state via the semapthore
Browse files Browse the repository at this point in the history
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
  • Loading branch information
zaharidichev committed Sep 23, 2020
1 parent 6032187 commit 0aaca66
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
18 changes: 11 additions & 7 deletions tokio/src/sync/mpsc/chan.rs
@@ -1,6 +1,5 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
use crate::sync::mpsc::error::{ClosedError, TryRecvError};
Expand Down Expand Up @@ -102,15 +101,14 @@ pub(crate) trait Semaphore {
fn forget(&self, permit: &mut Self::Permit);

fn close(&self);

fn closed(&self) -> bool;
}

struct Chan<T, S> {
/// Notifies all tasks listening for the receiver being dropped
notify_rx_closed: Notify,

/// Indicates whether the receiver has been dropped
rx_closed: AtomicBool,

/// Handle to the push half of the lock-free list.
tx: list::Tx<T>,

Expand Down Expand Up @@ -172,7 +170,6 @@ where
let (tx, rx) = list::channel();

let chan = Arc::new(Chan {
rx_closed: AtomicBool::new(false),
notify_rx_closed: Notify::new(),
tx,
semaphore,
Expand Down Expand Up @@ -240,7 +237,7 @@ where
return;
}

if self.inner.rx_closed.load(SeqCst) {
if self.inner.semaphore.closed() {
return;
}
notified.await;
Expand Down Expand Up @@ -320,7 +317,6 @@ where
});

self.inner.semaphore.close();
self.inner.rx_closed.store(true, SeqCst);
self.inner.notify_rx_closed.notify_waiters();
}

Expand Down Expand Up @@ -519,6 +515,10 @@ impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) {
fn close(&self) {
self.0.close();
}

fn closed(&self) -> bool {
self.0.closed()
}
}

// ===== impl Semaphore for AtomicUsize =====
Expand Down Expand Up @@ -584,4 +584,8 @@ impl Semaphore for AtomicUsize {
fn close(&self) {
self.fetch_or(1, Release);
}

fn closed(&self) -> bool {
self.load(Acquire) & 1 == 1
}
}
5 changes: 5 additions & 0 deletions tokio/src/sync/semaphore_ll.rs
Expand Up @@ -177,6 +177,11 @@ impl Semaphore {
curr.available_permits()
}

/// Returns whether the semaphore is closed
pub(crate) fn closed(&self) -> bool {
SemState(self.state.load(Acquire)).is_closed()
}

/// Tries to acquire the requested number of permits, registering the waiter
/// if not enough permits are available.
fn poll_acquire(
Expand Down

0 comments on commit 0aaca66

Please sign in to comment.