diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 42be67613e7..99f27a0e218 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -547,12 +547,20 @@ impl Sender { Ok(()) } - /// Modifies watched value, notifying all receivers. + /// Modifies watched value, notifying all receivers if modified. /// /// This can useful for modifying the watched value, without /// having to allocate a new instance. Additionally, this /// method permits sending values even when there are no receivers. /// + /// The closure that modifies the value must return `true` if the + /// value has actually been modified. It should only return `false` + /// if the value is guaranteed to be unnmodified despite the mutable + /// borrow. Receivers are only notified if the value has been modified. + /// + /// Returns the result of the closure, i.e. `true` if the value has + /// been modified and `false` otherwise. + /// /// # Panics /// /// This function panics if calling `func` results in a panic. @@ -567,20 +575,38 @@ impl Sender { /// struct State { /// counter: usize, /// } - /// let (state_tx, state_rx) = watch::channel(State { counter: 0 }); - /// state_tx.send_modify(|state| state.counter += 1); + /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 }); + /// let inc_counter_if_odd = |state: &mut State| { + /// if state.counter % 2 == 1 { + /// state.counter += 1; + /// return true; + /// } + /// false + /// }; + /// /// assert_eq!(state_rx.borrow().counter, 1); + /// + /// assert!(!state_rx.has_changed().unwrap()); + /// assert!(state_tx.send_modify(inc_counter_if_odd)); + /// assert!(state_rx.has_changed().unwrap()); + /// assert_eq!(state_rx.borrow_and_update().counter, 2); + /// + /// assert!(!state_rx.has_changed().unwrap()); + /// assert!(!state_tx.send_modify(inc_counter_if_odd)); + /// assert!(!state_rx.has_changed().unwrap()); + /// assert_eq!(state_rx.borrow_and_update().counter, 2); /// ``` - pub fn send_modify(&self, func: F) + pub fn send_modify(&self, func: F) -> bool where - F: FnOnce(&mut T), + F: FnOnce(&mut T) -> bool, { { // Acquire the write lock and update the value. let mut lock = self.shared.value.write().unwrap(); // Update the value and catch possible panic inside func. + let mut modified = false; let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - func(&mut lock); + modified = func(&mut lock); })); // If the func panicked return the panic to the caller. if let Err(error) = result { @@ -588,6 +614,9 @@ impl Sender { drop(lock); panic::resume_unwind(error); } + if !modified { + return false; + } self.shared.state.increment_version(); @@ -600,6 +629,8 @@ impl Sender { } self.shared.notify_rx.notify_waiters(); + + true } /// Sends a new value via the channel, notifying all receivers and returning @@ -620,7 +651,10 @@ impl Sender { /// ``` pub fn send_replace(&self, mut value: T) -> T { // swap old watched value with the new one - self.send_modify(|old| mem::swap(old, &mut value)); + self.send_modify(|old| { + mem::swap(old, &mut value); + true + }); value } diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index d47f0df7326..eaa107f5901 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -217,7 +217,10 @@ fn reopened_after_subscribe() { fn send_modify_panic() { let (tx, mut rx) = watch::channel("one"); - tx.send_modify(|old| *old = "two"); + tx.send_modify(|old| { + *old = "two"; + true + }); assert_eq!(*rx.borrow_and_update(), "two"); let mut rx2 = rx.clone(); @@ -236,7 +239,10 @@ fn send_modify_panic() { assert_pending!(task.poll()); assert_eq!(*rx.borrow(), "panicked"); - tx.send_modify(|old| *old = "three"); + tx.send_modify(|old| { + *old = "three"; + true + }); assert_ready_ok!(task.poll()); assert_eq!(*rx.borrow_and_update(), "three"); }