diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index afeb7c2c9f7..184ba4b6041 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -547,17 +547,23 @@ impl Sender { Ok(()) } - /// Modifies watched value, notifying all receivers. + /// Modifies the watched value **unconditionally** in-place, + /// notifying all receivers. /// /// This can useful for modifying the watched value, without /// having to allocate a new instance. Additionally, this /// method permits sending values even when there are no receivers. /// + /// Prefer to use the more versatile function [`Self::send_if_modified()`] + /// if the value is only modified conditionally during the mutable borrow + /// to prevent unneeded change notifications for unmodified values. + /// /// # Panics /// - /// This function panics if calling `func` results in a panic. - /// No receivers are notified if panic occurred, but if the closure has modified - /// the value, that change is still visible to future calls to `borrow`. + /// This function panics when the invocation of the `modify` closure panics. + /// No receivers are notified when panicking. All changes of the watched + /// value applied by the closure before panicking will be visible in + /// subsequent calls to `borrow`. /// /// # Examples /// @@ -571,23 +577,99 @@ impl Sender { /// state_tx.send_modify(|state| state.counter += 1); /// assert_eq!(state_rx.borrow().counter, 1); /// ``` - pub fn send_modify(&self, func: F) + pub fn send_modify(&self, modify: F) where F: FnOnce(&mut T), + { + self.send_if_modified(|value| { + modify(value); + true + }); + } + + /// Modifies the watched value **conditionally** in-place, + /// notifying all receivers only if modified. + /// + /// This can useful for modifying the watched value, without + /// having to allocate a new instance. Additionally, this + /// method permits sending values even when there are no receivers. + /// + /// The `modify` closure must return `true` if the value has actually + /// been modified during the mutable borrow. It should only return `false` + /// if the value is guaranteed to be unnmodified despite the mutable + /// borrow. + /// + /// Receivers are only notified if the closure returned `true`. If the + /// closure has modified the value but returned `false` this results + /// in a *silent modification*, i.e. the modified value will be visible + /// in subsequent calls to `borrow`, but receivers will not receive + /// a change notification. + /// + /// Returns the result of the closure, i.e. `true` if the value has + /// been modified and `false` otherwise. + /// + /// # Panics + /// + /// This function panics when the invocation of the `modify` closure panics. + /// No receivers are notified when panicking. All changes of the watched + /// value applied by the closure before panicking will be visible in + /// subsequent calls to `borrow`. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// struct State { + /// counter: usize, + /// } + /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 }); + /// let inc_counter_if_odd = |state: &mut State| { + /// if state.counter % 2 == 1 { + /// state.counter += 1; + /// return true; + /// } + /// false + /// }; + /// + /// assert_eq!(state_rx.borrow().counter, 1); + /// + /// assert!(!state_rx.has_changed().unwrap()); + /// assert!(state_tx.send_if_modified(inc_counter_if_odd)); + /// assert!(state_rx.has_changed().unwrap()); + /// assert_eq!(state_rx.borrow_and_update().counter, 2); + /// + /// assert!(!state_rx.has_changed().unwrap()); + /// assert!(!state_tx.send_if_modified(inc_counter_if_odd)); + /// assert!(!state_rx.has_changed().unwrap()); + /// assert_eq!(state_rx.borrow_and_update().counter, 2); + /// ``` + pub fn send_if_modified(&self, modify: F) -> bool + where + F: FnOnce(&mut T) -> bool, { { // Acquire the write lock and update the value. let mut lock = self.shared.value.write().unwrap(); + // Update the value and catch possible panic inside func. - let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - func(&mut lock); - })); - // If the func panicked return the panic to the caller. - if let Err(error) = result { - // Drop the lock to avoid poisoning it. - drop(lock); - panic::resume_unwind(error); - } + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock))); + match result { + Ok(modified) => { + if !modified { + // Abort, i.e. don't notify receivers if unmodified + return false; + } + // Continue if modified + } + Err(panicked) => { + // Drop the lock to avoid poisoning it. + drop(lock); + // Forward the panic to the caller. + panic::resume_unwind(panicked); + // Unreachable + } + }; self.shared.state.increment_version(); @@ -600,6 +682,8 @@ impl Sender { } self.shared.notify_rx.notify_waiters(); + + true } /// Sends a new value via the channel, notifying all receivers and returning