From c64ca70dbc328670f5b50f4197286e66bf7a5b51 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 17 Sep 2020 14:07:32 +0300 Subject: [PATCH 1/6] sync: add `mpsc::Sender::closed` future Fixes: #2800 Signed-off-by: Zahari Dichev --- tokio/src/sync/mpsc/bounded.rs | 35 +++++++++++++++++++++++ tokio/src/sync/mpsc/chan.rs | 46 ++++++++++++++++++++++++++++++- tokio/src/sync/mpsc/unbounded.rs | 35 +++++++++++++++++++++++ tokio/src/sync/tests/loom_mpsc.rs | 28 +++++++++++++++++++ 4 files changed, 143 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 2d2006d5883..542eae2782b 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -320,6 +320,41 @@ impl Sender { } } + /// Completes when the receiver has dropped. + /// + /// This allows the producers to get notified when interest in the produced + /// values is canceled and immediately stop doing work. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx1, rx) = mpsc::channel::<()>(1); + /// let mut tx2 = tx1.clone(); + /// let mut tx3 = tx1.clone(); + /// let mut tx4 = tx1.clone(); + /// let mut tx5 = tx1.clone(); + /// tokio::spawn(async move { + /// drop(rx); + /// }); + /// + /// futures::join!( + /// tx1.closed(), + /// tx2.closed(), + /// tx3.closed(), + /// tx4.closed(), + /// tx5.closed() + /// ); + //// println!("Receiver dropped"); + /// } + /// ``` + pub async fn closed(&mut self) { + self.chan.closed().await + } + /// Attempts to immediately send a message on this `Sender` /// /// This method differs from [`send`] by returning immediately if the channel's diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 2d3f014996a..ebe0f54058b 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -1,13 +1,15 @@ use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; +use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; use crate::sync::mpsc::error::TryRecvError; use crate::sync::mpsc::list; +use crate::sync::Notify; use std::fmt; use std::process; -use std::sync::atomic::Ordering::{AcqRel, Relaxed}; +use std::sync::atomic::Ordering::{AcqRel, Relaxed, SeqCst}; use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; @@ -44,6 +46,12 @@ pub(crate) trait Semaphore { } struct Chan { + /// Notifies all tasks listening for the receiver being dropped + notify_rx_closed: Notify, + + /// Indicates whether the receiver has been dropped + rx_closed: AtomicBool, + /// Handle to the push half of the lock-free list. tx: list::Tx, @@ -102,6 +110,8 @@ pub(crate) fn channel(semaphore: S) -> (Tx, Rx) { let (tx, rx) = list::channel(); let chan = Arc::new(Chan { + rx_closed: AtomicBool::new(false), + notify_rx_closed: Notify::new(), tx, semaphore, rx_waker: AtomicWaker::new(), @@ -131,6 +141,38 @@ impl Tx { self.inner.send(value); } + pub(crate) async fn closed(&mut self) { + use std::future::Future; + use std::pin::Pin; + use std::task::Poll; + + // In order to avoid a race condition, we first request a notification, + // **then** check the current value's version. If a new version exists, + // the notification request is dropped. Requesting the notification + // requires polling the future once. + let notified = self.inner.notify_rx_closed.notified(); + pin!(notified); + + // Polling this for first time will register the waiter and + // return `Pending` or return `Ready` right away. If `Ready` + // is returned we are done + let aquired_lost_notification = + crate::future::poll_fn(|cx| match Pin::new(&mut notified).poll(cx) { + Poll::Ready(()) => Poll::Ready(true), + Poll::Pending => Poll::Ready(false), + }) + .await; + + if aquired_lost_notification { + return; + } + + if self.inner.rx_closed.load(SeqCst) { + return; + } + notified.await; + } + /// Wake the receive half pub(crate) fn wake_rx(&self) { self.inner.rx_waker.wake(); @@ -182,6 +224,8 @@ impl Rx { }); self.inner.semaphore.close(); + self.inner.rx_closed.store(true, SeqCst); + self.inner.notify_rx_closed.notify_waiters(); } /// Receive the next value diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 59456375297..09f71f218b4 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -210,4 +210,39 @@ impl UnboundedSender { } } } + + /// Completes when the receiver has dropped. + /// + /// This allows the producers to get notified when interest in the produced + /// values is canceled and immediately stop doing work. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx1, rx) = mpsc::unbounded_channel::<()>(); + /// let mut tx2 = tx1.clone(); + /// let mut tx3 = tx1.clone(); + /// let mut tx4 = tx1.clone(); + /// let mut tx5 = tx1.clone(); + /// tokio::spawn(async move { + /// drop(rx); + /// }); + /// + /// futures::join!( + /// tx1.closed(), + /// tx2.closed(), + /// tx3.closed(), + /// tx4.closed(), + /// tx5.closed() + /// ); + //// println!("Receiver dropped"); + /// } + /// ``` + pub async fn closed(&mut self) { + self.chan.closed().await + } } diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs index e8db2dea4ca..330e798bcdf 100644 --- a/tokio/src/sync/tests/loom_mpsc.rs +++ b/tokio/src/sync/tests/loom_mpsc.rs @@ -40,6 +40,34 @@ fn closing_unbounded_tx() { }); } +#[test] +fn closing_bounded_rx() { + loom::model(|| { + let (mut tx1, rx) = mpsc::channel::<()>(16); + let mut tx2 = tx1.clone(); + thread::spawn(move || { + drop(rx); + }); + + block_on(tx1.closed()); + block_on(tx2.closed()); + }); +} + +#[test] +fn closing_unbounded_rx() { + loom::model(|| { + let (mut tx1, rx) = mpsc::unbounded_channel::<()>(); + let mut tx2 = tx1.clone(); + thread::spawn(move || { + drop(rx); + }); + + block_on(tx1.closed()); + block_on(tx2.closed()); + }); +} + #[test] fn dropping_tx() { loom::model(|| { From c4899a7891a26452b75be1f91b97fac3bd2d506b Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 22 Sep 2020 11:27:03 +0300 Subject: [PATCH 2/6] Track closed state via the semapthore Signed-off-by: Zahari Dichev --- tokio/src/sync/mpsc/chan.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index ebe0f54058b..96eacfe8261 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -1,6 +1,5 @@ use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; -use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; use crate::sync::mpsc::error::TryRecvError; @@ -49,9 +48,6 @@ struct Chan { /// Notifies all tasks listening for the receiver being dropped notify_rx_closed: Notify, - /// Indicates whether the receiver has been dropped - rx_closed: AtomicBool, - /// Handle to the push half of the lock-free list. tx: list::Tx, @@ -110,7 +106,6 @@ pub(crate) fn channel(semaphore: S) -> (Tx, Rx) { let (tx, rx) = list::channel(); let chan = Arc::new(Chan { - rx_closed: AtomicBool::new(false), notify_rx_closed: Notify::new(), tx, semaphore, @@ -167,7 +162,7 @@ impl Tx { return; } - if self.inner.rx_closed.load(SeqCst) { + if self.inner.semaphore.is_closed() { return; } notified.await; @@ -224,7 +219,6 @@ impl Rx { }); self.inner.semaphore.close(); - self.inner.rx_closed.store(true, SeqCst); self.inner.notify_rx_closed.notify_waiters(); } From 60ef273b32c901b01863f26f91d36915d8111e21 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 22 Sep 2020 11:30:47 +0300 Subject: [PATCH 3/6] Always expect `Pending` when polling the `Notified` Signed-off-by: Zahari Dichev --- tokio/src/sync/mpsc/chan.rs | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 96eacfe8261..001f4b13df5 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -8,7 +8,7 @@ use crate::sync::Notify; use std::fmt; use std::process; -use std::sync::atomic::Ordering::{AcqRel, Relaxed, SeqCst}; +use std::sync::atomic::Ordering::{AcqRel, Relaxed}; use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; @@ -148,19 +148,14 @@ impl Tx { let notified = self.inner.notify_rx_closed.notified(); pin!(notified); - // Polling this for first time will register the waiter and - // return `Pending` or return `Ready` right away. If `Ready` - // is returned we are done - let aquired_lost_notification = - crate::future::poll_fn(|cx| match Pin::new(&mut notified).poll(cx) { - Poll::Ready(()) => Poll::Ready(true), - Poll::Pending => Poll::Ready(false), - }) - .await; - - if aquired_lost_notification { - return; - } + // Polling the future once is guaranteed to return `Pending` as `watch` + // only notifies using `notify_waiters`. + crate::future::poll_fn(|cx| { + let res = Pin::new(&mut notified).poll(cx); + assert!(!res.is_ready()); + Poll::Ready(()) + }) + .await; if self.inner.semaphore.is_closed() { return; From edd97a2024c961f0af41f9bfeca709a5eec9c76e Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Wed, 23 Sep 2020 12:13:50 +0300 Subject: [PATCH 4/6] Fix features errors Signed-off-by: Zahari Dichev --- tokio/src/sync/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 6531931b365..853a22dafc1 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -473,6 +473,7 @@ cfg_not_sync! { cfg_signal_internal! { pub(crate) mod mpsc; pub(crate) mod batch_semaphore; + pub(crate) mod notify; } } From 19bfab1ae19bc63f1d017047da8529c0d4c26460 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Fri, 25 Sep 2020 09:25:48 +0300 Subject: [PATCH 5/6] Add Semaphore bound for closed impl Signed-off-by: Zahari Dichev --- tokio/src/sync/mpsc/chan.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 001f4b13df5..e7b951ed733 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -136,6 +136,13 @@ impl Tx { self.inner.send(value); } + /// Wake the receive half + pub(crate) fn wake_rx(&self) { + self.inner.rx_waker.wake(); + } +} + +impl Tx { pub(crate) async fn closed(&mut self) { use std::future::Future; use std::pin::Pin; @@ -162,11 +169,6 @@ impl Tx { } notified.await; } - - /// Wake the receive half - pub(crate) fn wake_rx(&self) { - self.inner.rx_waker.wake(); - } } impl Clone for Tx { From 89c47fce10932ca6f6c5312cf62cb57df81e08c9 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Fri, 25 Sep 2020 09:47:41 +0300 Subject: [PATCH 6/6] Remove duplicate mod Signed-off-by: Zahari Dichev --- tokio/src/sync/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 853a22dafc1..6531931b365 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -473,7 +473,6 @@ cfg_not_sync! { cfg_signal_internal! { pub(crate) mod mpsc; pub(crate) mod batch_semaphore; - pub(crate) mod notify; } }