From 55d932a21fd4c5fa298ca3cfdcb1388dbbf43dd0 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Fri, 25 Sep 2020 18:40:31 +0300 Subject: [PATCH] sync: add `mpsc::Sender::closed` future (#2840) Adding closed future, makes it possible to select over closed and some other work, so that the task is woken when the channel is closed and can proactively cancel itself. Added a mpsc::Sender::closed future that will become ready when the receiver is closed. --- tokio/src/sync/mpsc/bounded.rs | 35 +++++++++++++++++++++++++++++++ tokio/src/sync/mpsc/chan.rs | 35 +++++++++++++++++++++++++++++++ tokio/src/sync/mpsc/unbounded.rs | 35 +++++++++++++++++++++++++++++++ tokio/src/sync/tests/loom_mpsc.rs | 28 +++++++++++++++++++++++++ 4 files changed, 133 insertions(+) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 2d2006d5883..542eae2782b 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -320,6 +320,41 @@ impl Sender { } } + /// Completes when the receiver has dropped. + /// + /// This allows the producers to get notified when interest in the produced + /// values is canceled and immediately stop doing work. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx1, rx) = mpsc::channel::<()>(1); + /// let mut tx2 = tx1.clone(); + /// let mut tx3 = tx1.clone(); + /// let mut tx4 = tx1.clone(); + /// let mut tx5 = tx1.clone(); + /// tokio::spawn(async move { + /// drop(rx); + /// }); + /// + /// futures::join!( + /// tx1.closed(), + /// tx2.closed(), + /// tx3.closed(), + /// tx4.closed(), + /// tx5.closed() + /// ); + //// println!("Receiver dropped"); + /// } + /// ``` + pub async fn closed(&mut self) { + self.chan.closed().await + } + /// Attempts to immediately send a message on this `Sender` /// /// This method differs from [`send`] by returning immediately if the channel's diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 2d3f014996a..e7b951ed733 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -4,6 +4,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; use crate::sync::mpsc::error::TryRecvError; use crate::sync::mpsc::list; +use crate::sync::Notify; use std::fmt; use std::process; @@ -44,6 +45,9 @@ pub(crate) trait Semaphore { } struct Chan { + /// Notifies all tasks listening for the receiver being dropped + notify_rx_closed: Notify, + /// Handle to the push half of the lock-free list. tx: list::Tx, @@ -102,6 +106,7 @@ pub(crate) fn channel(semaphore: S) -> (Tx, Rx) { let (tx, rx) = list::channel(); let chan = Arc::new(Chan { + notify_rx_closed: Notify::new(), tx, semaphore, rx_waker: AtomicWaker::new(), @@ -137,6 +142,35 @@ impl Tx { } } +impl Tx { + pub(crate) async fn closed(&mut self) { + use std::future::Future; + use std::pin::Pin; + use std::task::Poll; + + // In order to avoid a race condition, we first request a notification, + // **then** check the current value's version. If a new version exists, + // the notification request is dropped. Requesting the notification + // requires polling the future once. + let notified = self.inner.notify_rx_closed.notified(); + pin!(notified); + + // Polling the future once is guaranteed to return `Pending` as `watch` + // only notifies using `notify_waiters`. + crate::future::poll_fn(|cx| { + let res = Pin::new(&mut notified).poll(cx); + assert!(!res.is_ready()); + Poll::Ready(()) + }) + .await; + + if self.inner.semaphore.is_closed() { + return; + } + notified.await; + } +} + impl Clone for Tx { fn clone(&self) -> Tx { // Using a Relaxed ordering here is sufficient as the caller holds a @@ -182,6 +216,7 @@ impl Rx { }); self.inner.semaphore.close(); + self.inner.notify_rx_closed.notify_waiters(); } /// Receive the next value diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 59456375297..09f71f218b4 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -210,4 +210,39 @@ impl UnboundedSender { } } } + + /// Completes when the receiver has dropped. + /// + /// This allows the producers to get notified when interest in the produced + /// values is canceled and immediately stop doing work. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx1, rx) = mpsc::unbounded_channel::<()>(); + /// let mut tx2 = tx1.clone(); + /// let mut tx3 = tx1.clone(); + /// let mut tx4 = tx1.clone(); + /// let mut tx5 = tx1.clone(); + /// tokio::spawn(async move { + /// drop(rx); + /// }); + /// + /// futures::join!( + /// tx1.closed(), + /// tx2.closed(), + /// tx3.closed(), + /// tx4.closed(), + /// tx5.closed() + /// ); + //// println!("Receiver dropped"); + /// } + /// ``` + pub async fn closed(&mut self) { + self.chan.closed().await + } } diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs index e8db2dea4ca..330e798bcdf 100644 --- a/tokio/src/sync/tests/loom_mpsc.rs +++ b/tokio/src/sync/tests/loom_mpsc.rs @@ -40,6 +40,34 @@ fn closing_unbounded_tx() { }); } +#[test] +fn closing_bounded_rx() { + loom::model(|| { + let (mut tx1, rx) = mpsc::channel::<()>(16); + let mut tx2 = tx1.clone(); + thread::spawn(move || { + drop(rx); + }); + + block_on(tx1.closed()); + block_on(tx2.closed()); + }); +} + +#[test] +fn closing_unbounded_rx() { + loom::model(|| { + let (mut tx1, rx) = mpsc::unbounded_channel::<()>(); + let mut tx2 = tx1.clone(); + thread::spawn(move || { + drop(rx); + }); + + block_on(tx1.closed()); + block_on(tx2.closed()); + }); +} + #[test] fn dropping_tx() { loom::model(|| {