diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 42be67613e7..0f076ff36b8 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -574,13 +574,73 @@ impl Sender { pub fn send_modify(&self, func: F) where F: FnOnce(&mut T), + { + self.send_if_modified(|value| { + func(value); + true + }); + } + + /// Modifies watched value, notifying all receivers if modified. + /// + /// This can useful for modifying the watched value, without + /// having to allocate a new instance. Additionally, this + /// method permits sending values even when there are no receivers. + /// + /// The closure that modifies the value must return `true` if the + /// value has actually been modified. It should only return `false` + /// if the value is guaranteed to be unnmodified despite the mutable + /// borrow. Receivers are only notified if the value has been modified. + /// + /// Returns the result of the closure, i.e. `true` if the value has + /// been modified and `false` otherwise. + /// + /// # Panics + /// + /// This function panics if calling `func` results in a panic. + /// No receivers are notified if panic occurred, but if the closure has modified + /// the value, that change is still visible to future calls to `borrow`. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// struct State { + /// counter: usize, + /// } + /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 }); + /// let inc_counter_if_odd = |state: &mut State| { + /// if state.counter % 2 == 1 { + /// state.counter += 1; + /// return true; + /// } + /// false + /// }; + /// + /// assert_eq!(state_rx.borrow().counter, 1); + /// + /// assert!(!state_rx.has_changed().unwrap()); + /// assert!(state_tx.send_if_modified(inc_counter_if_odd)); + /// assert!(state_rx.has_changed().unwrap()); + /// assert_eq!(state_rx.borrow_and_update().counter, 2); + /// + /// assert!(!state_rx.has_changed().unwrap()); + /// assert!(!state_tx.send_if_modified(inc_counter_if_odd)); + /// assert!(!state_rx.has_changed().unwrap()); + /// assert_eq!(state_rx.borrow_and_update().counter, 2); + /// ``` + pub fn send_if_modified(&self, modify: F) -> bool + where + F: FnOnce(&mut T) -> bool, { { // Acquire the write lock and update the value. let mut lock = self.shared.value.write().unwrap(); // Update the value and catch possible panic inside func. + let mut modified = false; let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - func(&mut lock); + modified = modify(&mut lock); })); // If the func panicked return the panic to the caller. if let Err(error) = result { @@ -588,6 +648,9 @@ impl Sender { drop(lock); panic::resume_unwind(error); } + if !modified { + return false; + } self.shared.state.increment_version(); @@ -600,6 +663,8 @@ impl Sender { } self.shared.notify_rx.notify_waiters(); + + true } /// Sends a new value via the channel, notifying all receivers and returning