From c71360355ab1a5a320f5f8d784f28dc291fb0c7e Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Thu, 31 Mar 2022 11:47:32 +0200 Subject: [PATCH 1/4] watch::Sender: Modify and send value conditionally Add a function that is more versatile than send_modify(). The result of the passed closure indicates if the mutably borrowed value has actually been modified or not. Receivers are only notified if the value has been modified as indicated by the sender. Signed-off-by: Uwe Klotz --- tokio/src/sync/watch.rs | 67 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 42be67613e7..0f076ff36b8 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -574,13 +574,73 @@ impl Sender { pub fn send_modify(&self, func: F) where F: FnOnce(&mut T), + { + self.send_if_modified(|value| { + func(value); + true + }); + } + + /// Modifies watched value, notifying all receivers if modified. + /// + /// This can useful for modifying the watched value, without + /// having to allocate a new instance. Additionally, this + /// method permits sending values even when there are no receivers. + /// + /// The closure that modifies the value must return `true` if the + /// value has actually been modified. It should only return `false` + /// if the value is guaranteed to be unnmodified despite the mutable + /// borrow. Receivers are only notified if the value has been modified. + /// + /// Returns the result of the closure, i.e. `true` if the value has + /// been modified and `false` otherwise. + /// + /// # Panics + /// + /// This function panics if calling `func` results in a panic. + /// No receivers are notified if panic occurred, but if the closure has modified + /// the value, that change is still visible to future calls to `borrow`. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// struct State { + /// counter: usize, + /// } + /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 }); + /// let inc_counter_if_odd = |state: &mut State| { + /// if state.counter % 2 == 1 { + /// state.counter += 1; + /// return true; + /// } + /// false + /// }; + /// + /// assert_eq!(state_rx.borrow().counter, 1); + /// + /// assert!(!state_rx.has_changed().unwrap()); + /// assert!(state_tx.send_if_modified(inc_counter_if_odd)); + /// assert!(state_rx.has_changed().unwrap()); + /// assert_eq!(state_rx.borrow_and_update().counter, 2); + /// + /// assert!(!state_rx.has_changed().unwrap()); + /// assert!(!state_tx.send_if_modified(inc_counter_if_odd)); + /// assert!(!state_rx.has_changed().unwrap()); + /// assert_eq!(state_rx.borrow_and_update().counter, 2); + /// ``` + pub fn send_if_modified(&self, modify: F) -> bool + where + F: FnOnce(&mut T) -> bool, { { // Acquire the write lock and update the value. let mut lock = self.shared.value.write().unwrap(); // Update the value and catch possible panic inside func. + let mut modified = false; let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - func(&mut lock); + modified = modify(&mut lock); })); // If the func panicked return the panic to the caller. if let Err(error) = result { @@ -588,6 +648,9 @@ impl Sender { drop(lock); panic::resume_unwind(error); } + if !modified { + return false; + } self.shared.state.increment_version(); @@ -600,6 +663,8 @@ impl Sender { } self.shared.notify_rx.notify_waiters(); + + true } /// Sends a new value via the channel, notifying all receivers and returning From 2acc5deddaafcb83d1c2943b4c1e637c679462cb Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Sun, 3 Apr 2022 13:58:39 +0200 Subject: [PATCH 2/4] watch::Sender Streamline control flow in send_if_modified() Avoid using mutable local variables. --- tokio/src/sync/watch.rs | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 0f076ff36b8..401b68b50e4 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -637,20 +637,24 @@ impl Sender { { // Acquire the write lock and update the value. let mut lock = self.shared.value.write().unwrap(); + // Update the value and catch possible panic inside func. - let mut modified = false; - let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - modified = modify(&mut lock); - })); - // If the func panicked return the panic to the caller. - if let Err(error) = result { - // Drop the lock to avoid poisoning it. - drop(lock); - panic::resume_unwind(error); - } - if !modified { - return false; - } + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock))); + match result { + Ok(true) => { + // Continue if modified + }, + Ok(false) => { + // Don't notify receivers if unmodified + return false; + } + Err(panicked) => { + // Drop the lock to avoid poisoning it. + drop(lock); + // Forward the panic to the caller. + panic::resume_unwind(panicked); + } + }; self.shared.state.increment_version(); From 6bd5cc5b7fff33ae3692f101f016fc9487d88316 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Sun, 3 Apr 2022 14:18:21 +0200 Subject: [PATCH 3/4] watch::Sender Improve readability of send_if_modified() --- tokio/src/sync/watch.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 401b68b50e4..d18a2fbfad2 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -641,18 +641,19 @@ impl Sender { // Update the value and catch possible panic inside func. let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock))); match result { - Ok(true) => { + Ok(modified) => { + if !modified { + // Abort, i.e. don't notify receivers if unmodified + return false; + } // Continue if modified - }, - Ok(false) => { - // Don't notify receivers if unmodified - return false; } Err(panicked) => { // Drop the lock to avoid poisoning it. drop(lock); // Forward the panic to the caller. panic::resume_unwind(panicked); + // Unreachable } }; From e577371b31cb91bfb9d5f8bcac7b791d25069b52 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Thu, 7 Apr 2022 15:51:19 +0200 Subject: [PATCH 4/4] sync::watch: Improve docs of Sender::send_modify()/send_if_modified() --- tokio/src/sync/watch.rs | 40 +++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index d18a2fbfad2..d6b149de1bc 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -547,17 +547,23 @@ impl Sender { Ok(()) } - /// Modifies watched value, notifying all receivers. + /// Modifies the watched value **unconditionally** in-place, + /// notifying all receivers. /// /// This can useful for modifying the watched value, without /// having to allocate a new instance. Additionally, this /// method permits sending values even when there are no receivers. /// + /// Prefer to use the more versatile function [`Self::send_if_modified()`] + /// if the value is only modified conditionally during the mutable borrow + /// to prevent unneeded change notifications for unmodified values. + /// /// # Panics /// - /// This function panics if calling `func` results in a panic. - /// No receivers are notified if panic occurred, but if the closure has modified - /// the value, that change is still visible to future calls to `borrow`. + /// This function panics when the invocation of the `modify` closure panics. + /// No receivers are notified when panicking. All changes of the watched + /// value applied by the closure before panicking will be visible in + /// subsequent calls to `borrow`. /// /// # Examples /// @@ -571,35 +577,43 @@ impl Sender { /// state_tx.send_modify(|state| state.counter += 1); /// assert_eq!(state_rx.borrow().counter, 1); /// ``` - pub fn send_modify(&self, func: F) + pub fn send_modify(&self, modify: F) where F: FnOnce(&mut T), { self.send_if_modified(|value| { - func(value); + modify(value); true }); } - /// Modifies watched value, notifying all receivers if modified. + /// Modifies the watched value **conditionally** in-place, + /// notifying all receivers only if modified. /// /// This can useful for modifying the watched value, without /// having to allocate a new instance. Additionally, this /// method permits sending values even when there are no receivers. /// - /// The closure that modifies the value must return `true` if the - /// value has actually been modified. It should only return `false` + /// The `modify` closure must return `true` if the value has actually + /// been modified during the mutable borrow. It should only return `false` /// if the value is guaranteed to be unnmodified despite the mutable - /// borrow. Receivers are only notified if the value has been modified. + /// borrow. + /// + /// Receivers are only notified if the closure returned `true`. If the + /// closure has modified the value but returned `false` this results + /// in a *silent modification*, i.e. the modified value will be visible + /// in subsequent calls to `borrow`, but receivers will not receive + /// a change notification. /// /// Returns the result of the closure, i.e. `true` if the value has /// been modified and `false` otherwise. /// /// # Panics /// - /// This function panics if calling `func` results in a panic. - /// No receivers are notified if panic occurred, but if the closure has modified - /// the value, that change is still visible to future calls to `borrow`. + /// This function panics when the invocation of the `modify` closure panics. + /// No receivers are notified when panicking. All changes of the watched + /// value applied by the closure before panicking will be visible in + /// subsequent calls to `borrow`. /// /// # Examples ///