From 12dd06336d2af8c2d735d4d9e3dc0454ad7942a0 Mon Sep 17 00:00:00 2001 From: Elichai Turkel Date: Fri, 31 Dec 2021 17:23:29 +0200 Subject: [PATCH] sync: add a `has_changed` method to `watch::Receiver` (#4342) --- tokio/src/sync/watch.rs | 42 +++++++++++++++++++++++++++++++++++++++ tokio/tests/sync_watch.rs | 7 +++++++ 2 files changed, 49 insertions(+) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 7e45c116c82..5e827fdbb8c 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -318,6 +318,48 @@ impl Receiver { Ref { inner } } + /// Checks if this channel contains a message that this receiver has not yet + /// seen. The new value is not marked as seen. + /// + /// Although this method is called `has_changed`, it does not check new + /// messages for equality, so this call will return true even if the new + /// message is equal to the old message. + /// + /// Returns an error if the channel has been closed. + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = watch::channel("hello"); + /// + /// tx.send("goodbye").unwrap(); + /// + /// assert!(rx.has_changed().unwrap()); + /// assert_eq!(*rx.borrow_and_update(), "goodbye"); + /// + /// // The value has been marked as seen + /// assert!(!rx.has_changed().unwrap()); + /// + /// drop(tx); + /// // The `tx` handle has been dropped + /// assert!(rx.has_changed().is_err()); + /// } + /// ``` + pub fn has_changed(&self) -> Result { + // Load the version from the state + let state = self.shared.state.load(); + if state.is_closed() { + // The sender has dropped. + return Err(error::RecvError(())); + } + let new_version = state.version(); + + Ok(self.version != new_version) + } + /// Waits for a change notification, then marks the newest value as seen. /// /// If the newest value in the channel has not yet been marked seen when diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index b7bbaf721c1..b982324262c 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -174,17 +174,24 @@ fn poll_close() { fn borrow_and_update() { let (tx, mut rx) = watch::channel("one"); + assert!(!rx.has_changed().unwrap()); + tx.send("two").unwrap(); + assert!(rx.has_changed().unwrap()); assert_ready!(spawn(rx.changed()).poll()).unwrap(); assert_pending!(spawn(rx.changed()).poll()); + assert!(!rx.has_changed().unwrap()); tx.send("three").unwrap(); + assert!(rx.has_changed().unwrap()); assert_eq!(*rx.borrow_and_update(), "three"); assert_pending!(spawn(rx.changed()).poll()); + assert!(!rx.has_changed().unwrap()); drop(tx); assert_eq!(*rx.borrow_and_update(), "three"); assert_ready!(spawn(rx.changed()).poll()).unwrap_err(); + assert!(rx.has_changed().is_err()); } #[test]