From c14566e9df5c71de8ac393dd40973362282ac157 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Tue, 3 May 2022 08:57:10 +0200 Subject: [PATCH] watch: modify and send value conditionally (#4591) Add a function that is more versatile than send_modify(). The result of the passed closure indicates if the mutably borrowed value has actually been modified or not. Receivers are only notified if the value has been modified as indicated by the sender. Signed-off-by: Uwe Klotz --- tokio/src/sync/watch.rs | 112 +++++++++++++++++++++++++++++++++++----- 1 file changed, 98 insertions(+), 14 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index afeb7c2c9f7..184ba4b6041 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -547,17 +547,23 @@ impl Sender { Ok(()) } - /// Modifies watched value, notifying all receivers. + /// Modifies the watched value **unconditionally** in-place, + /// notifying all receivers. /// /// This can useful for modifying the watched value, without /// having to allocate a new instance. Additionally, this /// method permits sending values even when there are no receivers. /// + /// Prefer to use the more versatile function [`Self::send_if_modified()`] + /// if the value is only modified conditionally during the mutable borrow + /// to prevent unneeded change notifications for unmodified values. + /// /// # Panics /// - /// This function panics if calling `func` results in a panic. - /// No receivers are notified if panic occurred, but if the closure has modified - /// the value, that change is still visible to future calls to `borrow`. + /// This function panics when the invocation of the `modify` closure panics. + /// No receivers are notified when panicking. All changes of the watched + /// value applied by the closure before panicking will be visible in + /// subsequent calls to `borrow`. /// /// # Examples /// @@ -571,23 +577,99 @@ impl Sender { /// state_tx.send_modify(|state| state.counter += 1); /// assert_eq!(state_rx.borrow().counter, 1); /// ``` - pub fn send_modify(&self, func: F) + pub fn send_modify(&self, modify: F) where F: FnOnce(&mut T), + { + self.send_if_modified(|value| { + modify(value); + true + }); + } + + /// Modifies the watched value **conditionally** in-place, + /// notifying all receivers only if modified. + /// + /// This can useful for modifying the watched value, without + /// having to allocate a new instance. Additionally, this + /// method permits sending values even when there are no receivers. + /// + /// The `modify` closure must return `true` if the value has actually + /// been modified during the mutable borrow. It should only return `false` + /// if the value is guaranteed to be unnmodified despite the mutable + /// borrow. + /// + /// Receivers are only notified if the closure returned `true`. If the + /// closure has modified the value but returned `false` this results + /// in a *silent modification*, i.e. the modified value will be visible + /// in subsequent calls to `borrow`, but receivers will not receive + /// a change notification. + /// + /// Returns the result of the closure, i.e. `true` if the value has + /// been modified and `false` otherwise. + /// + /// # Panics + /// + /// This function panics when the invocation of the `modify` closure panics. + /// No receivers are notified when panicking. All changes of the watched + /// value applied by the closure before panicking will be visible in + /// subsequent calls to `borrow`. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// struct State { + /// counter: usize, + /// } + /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 }); + /// let inc_counter_if_odd = |state: &mut State| { + /// if state.counter % 2 == 1 { + /// state.counter += 1; + /// return true; + /// } + /// false + /// }; + /// + /// assert_eq!(state_rx.borrow().counter, 1); + /// + /// assert!(!state_rx.has_changed().unwrap()); + /// assert!(state_tx.send_if_modified(inc_counter_if_odd)); + /// assert!(state_rx.has_changed().unwrap()); + /// assert_eq!(state_rx.borrow_and_update().counter, 2); + /// + /// assert!(!state_rx.has_changed().unwrap()); + /// assert!(!state_tx.send_if_modified(inc_counter_if_odd)); + /// assert!(!state_rx.has_changed().unwrap()); + /// assert_eq!(state_rx.borrow_and_update().counter, 2); + /// ``` + pub fn send_if_modified(&self, modify: F) -> bool + where + F: FnOnce(&mut T) -> bool, { { // Acquire the write lock and update the value. let mut lock = self.shared.value.write().unwrap(); + // Update the value and catch possible panic inside func. - let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - func(&mut lock); - })); - // If the func panicked return the panic to the caller. - if let Err(error) = result { - // Drop the lock to avoid poisoning it. - drop(lock); - panic::resume_unwind(error); - } + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock))); + match result { + Ok(modified) => { + if !modified { + // Abort, i.e. don't notify receivers if unmodified + return false; + } + // Continue if modified + } + Err(panicked) => { + // Drop the lock to avoid poisoning it. + drop(lock); + // Forward the panic to the caller. + panic::resume_unwind(panicked); + // Unreachable + } + }; self.shared.state.increment_version(); @@ -600,6 +682,8 @@ impl Sender { } self.shared.notify_rx.notify_waiters(); + + true } /// Sends a new value via the channel, notifying all receivers and returning