Skip to content

Commit

Permalink
watch::Sender: Modify and send value conditionally
Browse files Browse the repository at this point in the history
Add a function that is more versatile than send_modify(). The result of
the passed closure indicates if the mutably borrowed value has actually
been modified or not. Receivers are only notified if the value has been
modified as indicated by the sender.

Signed-off-by: Uwe Klotz <uwe.klotz@slowtec.de>
  • Loading branch information
uklotzde committed Mar 31, 2022
1 parent 702d6dc commit c713603
Showing 1 changed file with 66 additions and 1 deletion.
67 changes: 66 additions & 1 deletion tokio/src/sync/watch.rs
Expand Up @@ -574,20 +574,83 @@ impl<T> Sender<T> {
pub fn send_modify<F>(&self, func: F)
where
F: FnOnce(&mut T),
{
self.send_if_modified(|value| {
func(value);
true
});
}

/// Modifies watched value, notifying all receivers if modified.
///
/// This can useful for modifying the watched value, without
/// having to allocate a new instance. Additionally, this
/// method permits sending values even when there are no receivers.
///
/// The closure that modifies the value must return `true` if the
/// value has actually been modified. It should only return `false`
/// if the value is guaranteed to be unnmodified despite the mutable
/// borrow. Receivers are only notified if the value has been modified.
///
/// Returns the result of the closure, i.e. `true` if the value has
/// been modified and `false` otherwise.
///
/// # Panics
///
/// This function panics if calling `func` results in a panic.
/// No receivers are notified if panic occurred, but if the closure has modified
/// the value, that change is still visible to future calls to `borrow`.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// struct State {
/// counter: usize,
/// }
/// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
/// let inc_counter_if_odd = |state: &mut State| {
/// if state.counter % 2 == 1 {
/// state.counter += 1;
/// return true;
/// }
/// false
/// };
///
/// assert_eq!(state_rx.borrow().counter, 1);
///
/// assert!(!state_rx.has_changed().unwrap());
/// assert!(state_tx.send_if_modified(inc_counter_if_odd));
/// assert!(state_rx.has_changed().unwrap());
/// assert_eq!(state_rx.borrow_and_update().counter, 2);
///
/// assert!(!state_rx.has_changed().unwrap());
/// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
/// assert!(!state_rx.has_changed().unwrap());
/// assert_eq!(state_rx.borrow_and_update().counter, 2);
/// ```
pub fn send_if_modified<F>(&self, modify: F) -> bool
where
F: FnOnce(&mut T) -> bool,
{
{
// Acquire the write lock and update the value.
let mut lock = self.shared.value.write().unwrap();
// Update the value and catch possible panic inside func.
let mut modified = false;
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
func(&mut lock);
modified = modify(&mut lock);
}));
// If the func panicked return the panic to the caller.
if let Err(error) = result {
// Drop the lock to avoid poisoning it.
drop(lock);
panic::resume_unwind(error);
}
if !modified {
return false;
}

self.shared.state.increment_version();

Expand All @@ -600,6 +663,8 @@ impl<T> Sender<T> {
}

self.shared.notify_rx.notify_waiters();

true
}

/// Sends a new value via the channel, notifying all receivers and returning
Expand Down

0 comments on commit c713603

Please sign in to comment.