Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: add mpsc::Sender::closed future #2840

Merged
merged 6 commits into from Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 35 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Expand Up @@ -320,6 +320,41 @@ impl<T> Sender<T> {
}
}

/// Completes when the receiver has dropped.
///
/// This allows the producers to get notified when interest in the produced
/// values is canceled and immediately stop doing work.
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (mut tx1, rx) = mpsc::channel::<()>(1);
/// let mut tx2 = tx1.clone();
/// let mut tx3 = tx1.clone();
/// let mut tx4 = tx1.clone();
/// let mut tx5 = tx1.clone();
/// tokio::spawn(async move {
/// drop(rx);
/// });
///
/// futures::join!(
/// tx1.closed(),
/// tx2.closed(),
/// tx3.closed(),
/// tx4.closed(),
/// tx5.closed()
/// );
//// println!("Receiver dropped");
/// }
/// ```
pub async fn closed(&mut self) {
self.chan.closed().await
}

/// Attempts to immediately send a message on this `Sender`
///
/// This method differs from [`send`] by returning immediately if the channel's
Expand Down
35 changes: 35 additions & 0 deletions tokio/src/sync/mpsc/chan.rs
Expand Up @@ -4,6 +4,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
use crate::sync::mpsc::error::TryRecvError;
use crate::sync::mpsc::list;
use crate::sync::Notify;

use std::fmt;
use std::process;
Expand Down Expand Up @@ -44,6 +45,9 @@ pub(crate) trait Semaphore {
}

struct Chan<T, S> {
/// Notifies all tasks listening for the receiver being dropped
notify_rx_closed: Notify,

/// Handle to the push half of the lock-free list.
tx: list::Tx<T>,

Expand Down Expand Up @@ -102,6 +106,7 @@ pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
let (tx, rx) = list::channel();

let chan = Arc::new(Chan {
notify_rx_closed: Notify::new(),
tx,
semaphore,
rx_waker: AtomicWaker::new(),
Expand Down Expand Up @@ -137,6 +142,35 @@ impl<T, S> Tx<T, S> {
}
}

impl<T, S: Semaphore> Tx<T, S> {
pub(crate) async fn closed(&mut self) {
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;

// In order to avoid a race condition, we first request a notification,
// **then** check the current value's version. If a new version exists,
// the notification request is dropped. Requesting the notification
// requires polling the future once.
let notified = self.inner.notify_rx_closed.notified();
pin!(notified);

// Polling the future once is guaranteed to return `Pending` as `watch`
// only notifies using `notify_waiters`.
crate::future::poll_fn(|cx| {
let res = Pin::new(&mut notified).poll(cx);
assert!(!res.is_ready());
Poll::Ready(())
})
.await;

if self.inner.semaphore.is_closed() {
return;
}
notified.await;
}
}

impl<T, S> Clone for Tx<T, S> {
fn clone(&self) -> Tx<T, S> {
// Using a Relaxed ordering here is sufficient as the caller holds a
Expand Down Expand Up @@ -182,6 +216,7 @@ impl<T, S: Semaphore> Rx<T, S> {
});

self.inner.semaphore.close();
self.inner.notify_rx_closed.notify_waiters();
}

/// Receive the next value
Expand Down
35 changes: 35 additions & 0 deletions tokio/src/sync/mpsc/unbounded.rs
Expand Up @@ -210,4 +210,39 @@ impl<T> UnboundedSender<T> {
}
}
}

/// Completes when the receiver has dropped.
///
/// This allows the producers to get notified when interest in the produced
/// values is canceled and immediately stop doing work.
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (mut tx1, rx) = mpsc::unbounded_channel::<()>();
/// let mut tx2 = tx1.clone();
/// let mut tx3 = tx1.clone();
/// let mut tx4 = tx1.clone();
/// let mut tx5 = tx1.clone();
/// tokio::spawn(async move {
/// drop(rx);
/// });
///
/// futures::join!(
/// tx1.closed(),
/// tx2.closed(),
/// tx3.closed(),
/// tx4.closed(),
/// tx5.closed()
/// );
//// println!("Receiver dropped");
/// }
/// ```
pub async fn closed(&mut self) {
self.chan.closed().await
}
}
28 changes: 28 additions & 0 deletions tokio/src/sync/tests/loom_mpsc.rs
Expand Up @@ -40,6 +40,34 @@ fn closing_unbounded_tx() {
});
}

#[test]
fn closing_bounded_rx() {
loom::model(|| {
let (mut tx1, rx) = mpsc::channel::<()>(16);
let mut tx2 = tx1.clone();
thread::spawn(move || {
drop(rx);
});

block_on(tx1.closed());
block_on(tx2.closed());
});
}

#[test]
fn closing_unbounded_rx() {
loom::model(|| {
let (mut tx1, rx) = mpsc::unbounded_channel::<()>();
let mut tx2 = tx1.clone();
thread::spawn(move || {
drop(rx);
});

block_on(tx1.closed());
block_on(tx2.closed());
});
}

#[test]
fn dropping_tx() {
loom::model(|| {
Expand Down