Skip to content

Commit

Permalink
sync: Add has_changed method to watch::Ref (#4758)
Browse files Browse the repository at this point in the history
  • Loading branch information
uklotzde committed Jun 29, 2022
1 parent 79d8024 commit 7ed2737
Showing 1 changed file with 75 additions and 8 deletions.
83 changes: 75 additions & 8 deletions tokio/src/sync/watch.rs
Expand Up @@ -114,6 +114,55 @@ pub struct Sender<T> {
#[derive(Debug)]
pub struct Ref<'a, T> {
inner: RwLockReadGuard<'a, T>,
has_changed: bool,
}

impl<'a, T> Ref<'a, T> {
/// Indicates if the borrowed value is considered as _changed_ since the last
/// time it has been marked as seen.
///
/// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
///
/// When borrowed from the [`Sender`] this function will always return `false`.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = watch::channel("hello");
///
/// tx.send("goodbye").unwrap();
/// // The sender does never consider the value as changed.
/// assert!(!tx.borrow().has_changed());
///
/// // Drop the sender immediately, just for testing purposes.
/// drop(tx);
///
/// // Even if the sender has already been dropped...
/// assert!(rx.has_changed().is_err());
/// // ...the modified value is still readable and detected as changed.
/// assert_eq!(*rx.borrow(), "goodbye");
/// assert!(rx.borrow().has_changed());
///
/// // Read the changed value and mark it as seen.
/// {
/// let received = rx.borrow_and_update();
/// assert_eq!(*received, "goodbye");
/// assert!(received.has_changed());
/// // Release the read lock when leaving this scope.
/// }
///
/// // Now the value has already been marked as seen and could
/// // never be modified again (after the sender has been dropped).
/// assert!(!rx.borrow().has_changed());
/// }
/// ```
pub fn has_changed(&self) -> bool {
self.has_changed
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -336,15 +385,21 @@ impl<T> Receiver<T> {
/// ```
pub fn borrow(&self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
Ref { inner }

// After obtaining a read-lock no concurrent writes could occur
// and the loaded version matches that of the borrowed reference.
let new_version = self.shared.state.load().version();
let has_changed = self.version != new_version;

Ref { inner, has_changed }
}

/// Returns a reference to the most recently sent value and mark that value
/// Returns a reference to the most recently sent value and marks that value
/// as seen.
///
/// This method marks the value as seen, so [`changed`] will not return
/// immediately if the newest value is one previously returned by
/// `borrow_and_update`.
/// This method marks the current value as seen. Subsequent calls to [`changed`]
/// will not return immediately until the [`Sender`] has modified the shared
/// value again.
///
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
Expand Down Expand Up @@ -372,8 +427,16 @@ impl<T> Receiver<T> {
/// [`changed`]: Receiver::changed
pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
self.version = self.shared.state.load().version();
Ref { inner }

// After obtaining a read-lock no concurrent writes could occur
// and the loaded version matches that of the borrowed reference.
let new_version = self.shared.state.load().version();
let has_changed = self.version != new_version;

// Mark the shared value as seen by updating the version
self.version = new_version;

Ref { inner, has_changed }
}

/// Checks if this channel contains a message that this receiver has not yet
Expand Down Expand Up @@ -731,7 +794,11 @@ impl<T> Sender<T> {
/// ```
pub fn borrow(&self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
Ref { inner }

// The sender/producer always sees the current version
let has_changed = false;

Ref { inner, has_changed }
}

/// Checks if the channel has been closed. This happens when all receivers
Expand Down

0 comments on commit 7ed2737

Please sign in to comment.