From dee3236c97b5502290046b9d677e108089188a9e Mon Sep 17 00:00:00 2001 From: Kai Jewson Date: Sat, 25 Sep 2021 21:06:40 +0100 Subject: [PATCH] sync: add `watch::Sender::send_replace` (#3962) --- tokio/src/sync/watch.rs | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index b5da2184f7f..0293e6293c6 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -58,6 +58,7 @@ use crate::sync::notify::Notify; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::atomic::Ordering::Relaxed; use crate::loom::sync::{Arc, RwLock, RwLockReadGuard}; +use std::mem; use std::ops; /// Receives values from the associated [`Sender`](struct@Sender). @@ -427,15 +428,34 @@ impl Sender { /// This method fails if the channel has been closed, which happens when /// every receiver has been dropped. pub fn send(&self, value: T) -> Result<(), error::SendError> { + self.send_replace(value)?; + Ok(()) + } + + /// Sends a new value via the channel, notifying all receivers and returning + /// the previous value in the channel. + /// + /// This can be useful for reusing the buffers inside a watched value. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// let (tx, _rx) = watch::channel(1); + /// assert_eq!(tx.send_replace(2).unwrap(), 1); + /// assert_eq!(tx.send_replace(3).unwrap(), 2); + /// ``` + pub fn send_replace(&self, value: T) -> Result> { // This is pretty much only useful as a hint anyway, so synchronization isn't critical. if 0 == self.receiver_count() { return Err(error::SendError(value)); } - { + let old = { // Acquire the write lock and update the value. let mut lock = self.shared.value.write().unwrap(); - *lock = value; + let old = mem::replace(&mut *lock, value); self.shared.state.increment_version(); @@ -445,12 +465,14 @@ impl Sender { // that receivers are able to figure out the version number of the // value they are currently looking at. drop(lock); - } + + old + }; // Notify all watchers self.shared.notify_rx.notify_waiters(); - Ok(()) + Ok(old) } /// Returns a reference to the most recently sent value