Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: add watch::Sender::send_modify method #4310

Merged
merged 11 commits into from Feb 22, 2022
40 changes: 40 additions & 0 deletions tokio/src/sync/watch.rs
Expand Up @@ -437,6 +437,46 @@ impl<T> Sender<T> {
Ok(())
}

/// Modifies watched value, notifying all receivers.
///
/// This can useful for modifying the watched value, without
/// having to allocate a new instance. Additionally, this
/// method permits sending values even when there are no receivers.
///
/// # Examples
/// ```
/// use tokio::sync::watch;
///
/// struct State {
/// counter: usize,
/// }
/// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
/// state_tx.send_modify(|state| state.counter += 1);
/// assert_eq!(state_rx.borrow().counter, 1);
/// ```
pub fn send_modify<F>(&self, func: F)
where
F: FnOnce(&mut T),
{
nylonicious marked this conversation as resolved.
Show resolved Hide resolved
{
// Acquire the write lock and update the value.
let mut lock = self.shared.value.write().unwrap();
// Update the value.
func(&mut lock);
nylonicious marked this conversation as resolved.
Show resolved Hide resolved

self.shared.state.increment_version();

// Release the write lock.
//
// Incrementing the version counter while holding the lock ensures
// that receivers are able to figure out the version number of the
// value they are currently looking at.
drop(lock);
}

self.shared.notify_rx.notify_waiters();
}

/// Sends a new value via the channel, notifying all receivers and returning
/// the previous value in the channel.
///
Expand Down