Skip to content

Commit

Permalink
watch: modify and send value conditionally (#4591)
Browse files Browse the repository at this point in the history
Add a function that is more versatile than send_modify(). The result of
the passed closure indicates if the mutably borrowed value has actually
been modified or not. Receivers are only notified if the value has been
modified as indicated by the sender.

Signed-off-by: Uwe Klotz <uwe.klotz@slowtec.de>
  • Loading branch information
uklotzde committed May 3, 2022
1 parent 148bea8 commit c14566e
Showing 1 changed file with 98 additions and 14 deletions.
112 changes: 98 additions & 14 deletions tokio/src/sync/watch.rs
Expand Up @@ -547,17 +547,23 @@ impl<T> Sender<T> {
Ok(())
}

/// Modifies watched value, notifying all receivers.
/// Modifies the watched value **unconditionally** in-place,
/// notifying all receivers.
///
/// This can useful for modifying the watched value, without
/// having to allocate a new instance. Additionally, this
/// method permits sending values even when there are no receivers.
///
/// Prefer to use the more versatile function [`Self::send_if_modified()`]
/// if the value is only modified conditionally during the mutable borrow
/// to prevent unneeded change notifications for unmodified values.
///
/// # Panics
///
/// This function panics if calling `func` results in a panic.
/// No receivers are notified if panic occurred, but if the closure has modified
/// the value, that change is still visible to future calls to `borrow`.
/// This function panics when the invocation of the `modify` closure panics.
/// No receivers are notified when panicking. All changes of the watched
/// value applied by the closure before panicking will be visible in
/// subsequent calls to `borrow`.
///
/// # Examples
///
Expand All @@ -571,23 +577,99 @@ impl<T> Sender<T> {
/// state_tx.send_modify(|state| state.counter += 1);
/// assert_eq!(state_rx.borrow().counter, 1);
/// ```
pub fn send_modify<F>(&self, func: F)
pub fn send_modify<F>(&self, modify: F)
where
F: FnOnce(&mut T),
{
self.send_if_modified(|value| {
modify(value);
true
});
}

/// Modifies the watched value **conditionally** in-place,
/// notifying all receivers only if modified.
///
/// This can useful for modifying the watched value, without
/// having to allocate a new instance. Additionally, this
/// method permits sending values even when there are no receivers.
///
/// The `modify` closure must return `true` if the value has actually
/// been modified during the mutable borrow. It should only return `false`
/// if the value is guaranteed to be unnmodified despite the mutable
/// borrow.
///
/// Receivers are only notified if the closure returned `true`. If the
/// closure has modified the value but returned `false` this results
/// in a *silent modification*, i.e. the modified value will be visible
/// in subsequent calls to `borrow`, but receivers will not receive
/// a change notification.
///
/// Returns the result of the closure, i.e. `true` if the value has
/// been modified and `false` otherwise.
///
/// # Panics
///
/// This function panics when the invocation of the `modify` closure panics.
/// No receivers are notified when panicking. All changes of the watched
/// value applied by the closure before panicking will be visible in
/// subsequent calls to `borrow`.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// struct State {
/// counter: usize,
/// }
/// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
/// let inc_counter_if_odd = |state: &mut State| {
/// if state.counter % 2 == 1 {
/// state.counter += 1;
/// return true;
/// }
/// false
/// };
///
/// assert_eq!(state_rx.borrow().counter, 1);
///
/// assert!(!state_rx.has_changed().unwrap());
/// assert!(state_tx.send_if_modified(inc_counter_if_odd));
/// assert!(state_rx.has_changed().unwrap());
/// assert_eq!(state_rx.borrow_and_update().counter, 2);
///
/// assert!(!state_rx.has_changed().unwrap());
/// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
/// assert!(!state_rx.has_changed().unwrap());
/// assert_eq!(state_rx.borrow_and_update().counter, 2);
/// ```
pub fn send_if_modified<F>(&self, modify: F) -> bool
where
F: FnOnce(&mut T) -> bool,
{
{
// Acquire the write lock and update the value.
let mut lock = self.shared.value.write().unwrap();

// Update the value and catch possible panic inside func.
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
func(&mut lock);
}));
// If the func panicked return the panic to the caller.
if let Err(error) = result {
// Drop the lock to avoid poisoning it.
drop(lock);
panic::resume_unwind(error);
}
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
match result {
Ok(modified) => {
if !modified {
// Abort, i.e. don't notify receivers if unmodified
return false;
}
// Continue if modified
}
Err(panicked) => {
// Drop the lock to avoid poisoning it.
drop(lock);
// Forward the panic to the caller.
panic::resume_unwind(panicked);
// Unreachable
}
};

self.shared.state.increment_version();

Expand All @@ -600,6 +682,8 @@ impl<T> Sender<T> {
}

self.shared.notify_rx.notify_waiters();

true
}

/// Sends a new value via the channel, notifying all receivers and returning
Expand Down

0 comments on commit c14566e

Please sign in to comment.