From c71360355ab1a5a320f5f8d784f28dc291fb0c7e Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Thu, 31 Mar 2022 11:47:32 +0200 Subject: [PATCH] watch::Sender: Modify and send value conditionally Add a function that is more versatile than send_modify(). The result of the passed closure indicates if the mutably borrowed value has actually been modified or not. Receivers are only notified if the value has been modified as indicated by the sender. Signed-off-by: Uwe Klotz --- tokio/src/sync/watch.rs | 67 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 42be67613e7..0f076ff36b8 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -574,13 +574,73 @@ impl Sender { pub fn send_modify(&self, func: F) where F: FnOnce(&mut T), + { + self.send_if_modified(|value| { + func(value); + true + }); + } + + /// Modifies watched value, notifying all receivers if modified. + /// + /// This can useful for modifying the watched value, without + /// having to allocate a new instance. Additionally, this + /// method permits sending values even when there are no receivers. + /// + /// The closure that modifies the value must return `true` if the + /// value has actually been modified. It should only return `false` + /// if the value is guaranteed to be unnmodified despite the mutable + /// borrow. Receivers are only notified if the value has been modified. + /// + /// Returns the result of the closure, i.e. `true` if the value has + /// been modified and `false` otherwise. + /// + /// # Panics + /// + /// This function panics if calling `func` results in a panic. + /// No receivers are notified if panic occurred, but if the closure has modified + /// the value, that change is still visible to future calls to `borrow`. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// struct State { + /// counter: usize, + /// } + /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 }); + /// let inc_counter_if_odd = |state: &mut State| { + /// if state.counter % 2 == 1 { + /// state.counter += 1; + /// return true; + /// } + /// false + /// }; + /// + /// assert_eq!(state_rx.borrow().counter, 1); + /// + /// assert!(!state_rx.has_changed().unwrap()); + /// assert!(state_tx.send_if_modified(inc_counter_if_odd)); + /// assert!(state_rx.has_changed().unwrap()); + /// assert_eq!(state_rx.borrow_and_update().counter, 2); + /// + /// assert!(!state_rx.has_changed().unwrap()); + /// assert!(!state_tx.send_if_modified(inc_counter_if_odd)); + /// assert!(!state_rx.has_changed().unwrap()); + /// assert_eq!(state_rx.borrow_and_update().counter, 2); + /// ``` + pub fn send_if_modified(&self, modify: F) -> bool + where + F: FnOnce(&mut T) -> bool, { { // Acquire the write lock and update the value. let mut lock = self.shared.value.write().unwrap(); // Update the value and catch possible panic inside func. + let mut modified = false; let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - func(&mut lock); + modified = modify(&mut lock); })); // If the func panicked return the panic to the caller. if let Err(error) = result { @@ -588,6 +648,9 @@ impl Sender { drop(lock); panic::resume_unwind(error); } + if !modified { + return false; + } self.shared.state.increment_version(); @@ -600,6 +663,8 @@ impl Sender { } self.shared.notify_rx.notify_waiters(); + + true } /// Sends a new value via the channel, notifying all receivers and returning