Skip to content

Commit

Permalink
sync: add mpsc::Sender::closed future (#2840)
Browse files Browse the repository at this point in the history
Adding closed future, makes it possible to select over closed and some other
work, so that the task is woken when the channel is closed and can proactively
cancel itself.

Added a mpsc::Sender::closed future that will become ready when the receiver
is closed.
  • Loading branch information
zaharidichev committed Sep 25, 2020
1 parent 4446606 commit 55d932a
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 0 deletions.
35 changes: 35 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Expand Up @@ -320,6 +320,41 @@ impl<T> Sender<T> {
}
}

/// Completes when the receiver has dropped.
///
/// This allows the producers to get notified when interest in the produced
/// values is canceled and immediately stop doing work.
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (mut tx1, rx) = mpsc::channel::<()>(1);
/// let mut tx2 = tx1.clone();
/// let mut tx3 = tx1.clone();
/// let mut tx4 = tx1.clone();
/// let mut tx5 = tx1.clone();
/// tokio::spawn(async move {
/// drop(rx);
/// });
///
/// futures::join!(
/// tx1.closed(),
/// tx2.closed(),
/// tx3.closed(),
/// tx4.closed(),
/// tx5.closed()
/// );
//// println!("Receiver dropped");
/// }
/// ```
pub async fn closed(&mut self) {
self.chan.closed().await
}

/// Attempts to immediately send a message on this `Sender`
///
/// This method differs from [`send`] by returning immediately if the channel's
Expand Down
35 changes: 35 additions & 0 deletions tokio/src/sync/mpsc/chan.rs
Expand Up @@ -4,6 +4,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
use crate::sync::mpsc::error::TryRecvError;
use crate::sync::mpsc::list;
use crate::sync::Notify;

use std::fmt;
use std::process;
Expand Down Expand Up @@ -44,6 +45,9 @@ pub(crate) trait Semaphore {
}

struct Chan<T, S> {
/// Notifies all tasks listening for the receiver being dropped
notify_rx_closed: Notify,

/// Handle to the push half of the lock-free list.
tx: list::Tx<T>,

Expand Down Expand Up @@ -102,6 +106,7 @@ pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
let (tx, rx) = list::channel();

let chan = Arc::new(Chan {
notify_rx_closed: Notify::new(),
tx,
semaphore,
rx_waker: AtomicWaker::new(),
Expand Down Expand Up @@ -137,6 +142,35 @@ impl<T, S> Tx<T, S> {
}
}

impl<T, S: Semaphore> Tx<T, S> {
pub(crate) async fn closed(&mut self) {
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;

// In order to avoid a race condition, we first request a notification,
// **then** check the current value's version. If a new version exists,
// the notification request is dropped. Requesting the notification
// requires polling the future once.
let notified = self.inner.notify_rx_closed.notified();
pin!(notified);

// Polling the future once is guaranteed to return `Pending` as `watch`
// only notifies using `notify_waiters`.
crate::future::poll_fn(|cx| {
let res = Pin::new(&mut notified).poll(cx);
assert!(!res.is_ready());
Poll::Ready(())
})
.await;

if self.inner.semaphore.is_closed() {
return;
}
notified.await;
}
}

impl<T, S> Clone for Tx<T, S> {
fn clone(&self) -> Tx<T, S> {
// Using a Relaxed ordering here is sufficient as the caller holds a
Expand Down Expand Up @@ -182,6 +216,7 @@ impl<T, S: Semaphore> Rx<T, S> {
});

self.inner.semaphore.close();
self.inner.notify_rx_closed.notify_waiters();
}

/// Receive the next value
Expand Down
35 changes: 35 additions & 0 deletions tokio/src/sync/mpsc/unbounded.rs
Expand Up @@ -210,4 +210,39 @@ impl<T> UnboundedSender<T> {
}
}
}

/// Completes when the receiver has dropped.
///
/// This allows the producers to get notified when interest in the produced
/// values is canceled and immediately stop doing work.
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (mut tx1, rx) = mpsc::unbounded_channel::<()>();
/// let mut tx2 = tx1.clone();
/// let mut tx3 = tx1.clone();
/// let mut tx4 = tx1.clone();
/// let mut tx5 = tx1.clone();
/// tokio::spawn(async move {
/// drop(rx);
/// });
///
/// futures::join!(
/// tx1.closed(),
/// tx2.closed(),
/// tx3.closed(),
/// tx4.closed(),
/// tx5.closed()
/// );
//// println!("Receiver dropped");
/// }
/// ```
pub async fn closed(&mut self) {
self.chan.closed().await
}
}
28 changes: 28 additions & 0 deletions tokio/src/sync/tests/loom_mpsc.rs
Expand Up @@ -40,6 +40,34 @@ fn closing_unbounded_tx() {
});
}

#[test]
fn closing_bounded_rx() {
loom::model(|| {
let (mut tx1, rx) = mpsc::channel::<()>(16);
let mut tx2 = tx1.clone();
thread::spawn(move || {
drop(rx);
});

block_on(tx1.closed());
block_on(tx2.closed());
});
}

#[test]
fn closing_unbounded_rx() {
loom::model(|| {
let (mut tx1, rx) = mpsc::unbounded_channel::<()>();
let mut tx2 = tx1.clone();
thread::spawn(move || {
drop(rx);
});

block_on(tx1.closed());
block_on(tx2.closed());
});
}

#[test]
fn dropping_tx() {
loom::model(|| {
Expand Down

0 comments on commit 55d932a

Please sign in to comment.