Skip to content

Commit

Permalink
sync: add a has_changed method to watch::Receiver (#4342)
Browse files Browse the repository at this point in the history
  • Loading branch information
elichai committed Dec 31, 2021
1 parent c301f6d commit 12dd063
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
42 changes: 42 additions & 0 deletions tokio/src/sync/watch.rs
Expand Up @@ -318,6 +318,48 @@ impl<T> Receiver<T> {
Ref { inner }
}

/// Checks if this channel contains a message that this receiver has not yet
/// seen. The new value is not marked as seen.
///
/// Although this method is called `has_changed`, it does not check new
/// messages for equality, so this call will return true even if the new
/// message is equal to the old message.
///
/// Returns an error if the channel has been closed.
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = watch::channel("hello");
///
/// tx.send("goodbye").unwrap();
///
/// assert!(rx.has_changed().unwrap());
/// assert_eq!(*rx.borrow_and_update(), "goodbye");
///
/// // The value has been marked as seen
/// assert!(!rx.has_changed().unwrap());
///
/// drop(tx);
/// // The `tx` handle has been dropped
/// assert!(rx.has_changed().is_err());
/// }
/// ```
pub fn has_changed(&self) -> Result<bool, error::RecvError> {
// Load the version from the state
let state = self.shared.state.load();
if state.is_closed() {
// The sender has dropped.
return Err(error::RecvError(()));
}
let new_version = state.version();

Ok(self.version != new_version)
}

/// Waits for a change notification, then marks the newest value as seen.
///
/// If the newest value in the channel has not yet been marked seen when
Expand Down
7 changes: 7 additions & 0 deletions tokio/tests/sync_watch.rs
Expand Up @@ -174,17 +174,24 @@ fn poll_close() {
fn borrow_and_update() {
let (tx, mut rx) = watch::channel("one");

assert!(!rx.has_changed().unwrap());

tx.send("two").unwrap();
assert!(rx.has_changed().unwrap());
assert_ready!(spawn(rx.changed()).poll()).unwrap();
assert_pending!(spawn(rx.changed()).poll());
assert!(!rx.has_changed().unwrap());

tx.send("three").unwrap();
assert!(rx.has_changed().unwrap());
assert_eq!(*rx.borrow_and_update(), "three");
assert_pending!(spawn(rx.changed()).poll());
assert!(!rx.has_changed().unwrap());

drop(tx);
assert_eq!(*rx.borrow_and_update(), "three");
assert_ready!(spawn(rx.changed()).poll()).unwrap_err();
assert!(rx.has_changed().is_err());
}

#[test]
Expand Down

0 comments on commit 12dd063

Please sign in to comment.