diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 3d52e4b0a10..dd95a8439e7 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -58,6 +58,7 @@ use crate::sync::notify::Notify; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::atomic::Ordering::Relaxed; use crate::loom::sync::{Arc, RwLock, RwLockReadGuard}; +use std::mem; use std::ops; /// Receives values from the associated [`Sender`](struct@Sender). @@ -423,15 +424,34 @@ impl Drop for Receiver { impl Sender { /// Sends a new value via the channel, notifying all receivers. pub fn send(&self, value: T) -> Result<(), error::SendError> { + self.send_replace(value)?; + Ok(()) + } + + /// Sends a new value via the channel, notifying all receivers and returning + /// the previous value in the channel. + /// + /// This can be useful for reusing the buffers inside a watched value. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// let (tx, _rx) = watch::channel(1); + /// assert_eq!(tx.send_replace(2).unwrap(), 1); + /// assert_eq!(tx.send_replace(3).unwrap(), 2); + /// ``` + pub fn send_replace(&self, value: T) -> Result> { // This is pretty much only useful as a hint anyway, so synchronization isn't critical. if 0 == self.shared.ref_count_rx.load(Relaxed) { return Err(error::SendError { inner: value }); } - { + let old = { // Acquire the write lock and update the value. let mut lock = self.shared.value.write().unwrap(); - *lock = value; + let old = mem::replace(&mut *lock, value); self.shared.state.increment_version(); @@ -441,12 +461,14 @@ impl Sender { // that receivers are able to figure out the version number of the // value they are currently looking at. drop(lock); - } + + old + }; // Notify all watchers self.shared.notify_rx.notify_waiters(); - Ok(()) + Ok(old) } /// Returns a reference to the most recently sent value