Skip to content

Commit

Permalink
watch::Sender: Modify value conditionally
Browse files Browse the repository at this point in the history
To improve the versatility of this function the passed closure should
indicate if the mutably borrowed value has actually been modified or
not.

This is a breaking change of the Sender API!

Signed-off-by: Uwe Klotz <uwe.klotz@slowtec.de>
  • Loading branch information
uklotzde committed Mar 31, 2022
1 parent 702d6dc commit 1f3b5a3
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 9 deletions.
48 changes: 41 additions & 7 deletions tokio/src/sync/watch.rs
Expand Up @@ -547,12 +547,20 @@ impl<T> Sender<T> {
Ok(())
}

/// Modifies watched value, notifying all receivers.
/// Modifies watched value, notifying all receivers if modified.
///
/// This can useful for modifying the watched value, without
/// having to allocate a new instance. Additionally, this
/// method permits sending values even when there are no receivers.
///
/// The closure that modifies the value must return `true` if the
/// value has actually been modified. It should only return `false`
/// if the value is guaranteed to be unnmodified despite the mutable
/// borrow. Receivers are only notified if the value has been modified.
///
/// Returns the result of the closure, i.e. `true` if the value has
/// been modified and `false` otherwise.
///
/// # Panics
///
/// This function panics if calling `func` results in a panic.
Expand All @@ -567,27 +575,48 @@ impl<T> Sender<T> {
/// struct State {
/// counter: usize,
/// }
/// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
/// state_tx.send_modify(|state| state.counter += 1);
/// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
/// let inc_counter_if_odd = |state: &mut State| {
/// if state.counter % 2 == 1 {
/// state.counter += 1;
/// return true;
/// }
/// false
/// };
///
/// assert_eq!(state_rx.borrow().counter, 1);
///
/// assert!(!state_rx.has_changed().unwrap());
/// assert!(state_tx.send_modify(inc_counter_if_odd));
/// assert!(state_rx.has_changed().unwrap());
/// assert_eq!(state_rx.borrow_and_update().counter, 2);
///
/// assert!(!state_rx.has_changed().unwrap());
/// assert!(!state_tx.send_modify(inc_counter_if_odd));
/// assert!(!state_rx.has_changed().unwrap());
/// assert_eq!(state_rx.borrow_and_update().counter, 2);
/// ```
pub fn send_modify<F>(&self, func: F)
pub fn send_modify<F>(&self, func: F) -> bool
where
F: FnOnce(&mut T),
F: FnOnce(&mut T) -> bool,
{
{
// Acquire the write lock and update the value.
let mut lock = self.shared.value.write().unwrap();
// Update the value and catch possible panic inside func.
let mut modified = false;
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
func(&mut lock);
modified = func(&mut lock);
}));
// If the func panicked return the panic to the caller.
if let Err(error) = result {
// Drop the lock to avoid poisoning it.
drop(lock);
panic::resume_unwind(error);
}
if !modified {
return false;
}

self.shared.state.increment_version();

Expand All @@ -600,6 +629,8 @@ impl<T> Sender<T> {
}

self.shared.notify_rx.notify_waiters();

true
}

/// Sends a new value via the channel, notifying all receivers and returning
Expand All @@ -620,7 +651,10 @@ impl<T> Sender<T> {
/// ```
pub fn send_replace(&self, mut value: T) -> T {
// swap old watched value with the new one
self.send_modify(|old| mem::swap(old, &mut value));
self.send_modify(|old| {
mem::swap(old, &mut value);
true
});

value
}
Expand Down
10 changes: 8 additions & 2 deletions tokio/tests/sync_watch.rs
Expand Up @@ -217,7 +217,10 @@ fn reopened_after_subscribe() {
fn send_modify_panic() {
let (tx, mut rx) = watch::channel("one");

tx.send_modify(|old| *old = "two");
tx.send_modify(|old| {
*old = "two";
true
});
assert_eq!(*rx.borrow_and_update(), "two");

let mut rx2 = rx.clone();
Expand All @@ -236,7 +239,10 @@ fn send_modify_panic() {
assert_pending!(task.poll());
assert_eq!(*rx.borrow(), "panicked");

tx.send_modify(|old| *old = "three");
tx.send_modify(|old| {
*old = "three";
true
});
assert_ready_ok!(task.poll());
assert_eq!(*rx.borrow_and_update(), "three");
}

0 comments on commit 1f3b5a3

Please sign in to comment.