Skip to content

Commit

Permalink
sync::watch::Ref: Replace updated() with has_changed()
Browse files Browse the repository at this point in the history
  • Loading branch information
uklotzde committed Jun 9, 2022
1 parent 63c9987 commit b4fef98
Showing 1 changed file with 68 additions and 26 deletions.
94 changes: 68 additions & 26 deletions tokio/src/sync/watch.rs
Expand Up @@ -112,19 +112,55 @@ pub struct Sender<T> {
#[derive(Debug)]
pub struct Ref<'a, T> {
inner: RwLockReadGuard<'a, T>,
updated: bool,
has_changed: bool,
}

impl<'a, T> Ref<'a, T> {
/// Indicates if the shared value has been detected as _changed_
/// **and** marked as seen.
///
/// The result is `true` only if this reference has been obtained
/// from [`Receiver::borrow_and_update()`] in case changes have been
/// detected and marked as seen. In all other cases the result will
/// be `false`.
pub fn updated(&self) -> bool {
self.updated
/// Indicates if the borrowed value is considered as _changed_ since the last
/// time it has been marked as seen.
///
/// Other than [`Receiver::has_changed()`] it does not fail if the [`Receiver`]
/// is orphaned after the [`Sender`] has been dropped.
///
/// When borrowed from the [`Sender`] this function will always return `false`.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = watch::channel("hello");
///
/// tx.send("goodbye").unwrap();
/// // The sender does never consider the value as changed.
/// assert!(!tx.borrow().has_changed());
///
/// // Drop the sender immediately, just for testing purposes.
/// drop(tx);
///
/// // Even if the sender has already been dropped...
/// assert!(rx.has_changed().is_err());
/// // ...the modified value is still readable and detected as changed.
/// assert_eq!(*rx.borrow(), "goodbye");
/// assert!(rx.borrow().has_changed());
///
/// // Read the changed value and mark it as seen.
/// {
/// let received = rx.borrow_and_update();
/// assert_eq!(*received, "goodbye");
/// assert!(received.has_changed());
/// // Release the read lock when leaving this scope.
/// }
///
/// // Now the value has already been marked as seen and could
/// // never be modified again (after the sender has been dropped).
/// assert!(!rx.borrow().has_changed());
/// }
/// ```
pub fn has_changed(&self) -> bool {
self.has_changed
}
}

Expand Down Expand Up @@ -346,17 +382,21 @@ impl<T> Receiver<T> {
/// ```
pub fn borrow(&self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
// Changes are neither detected nor marked as seen by this function
let updated = false;
Ref { inner, updated }

// After obtaining a read-lock no concurrent writes could occur
// and the loaded version matches that of the borrowed reference.
let new_version = self.shared.state.load().version();
let has_changed = self.version != new_version;

Ref { inner, has_changed }
}

/// Returns a reference to the most recently sent value and marks that value
/// as seen.
///
/// This method marks the current value as seen as indicated by [`Ref::updated`].
/// Subsequent calls to [`changed`] will not return immediately until the
/// [`Sender`] has modified the shared value again.
/// This method marks the current value as seen. Subsequent calls to [`changed`]
/// will not return immediately until the [`Sender`] has modified the shared
/// value again.
///
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
Expand All @@ -382,16 +422,16 @@ impl<T> Receiver<T> {
/// [`changed`]: Receiver::changed
pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();

// After obtaining a read-lock no concurrent writes could occur
// and the loaded version matches that of the borrowed reference.
let new_version = self.shared.state.load().version();
let updated = if self.version != new_version {
self.version = new_version;
true
} else {
false
};
Ref { inner, updated }
let has_changed = self.version != new_version;

// Mark the shared value as seen by updating the version
self.version = new_version;

Ref { inner, has_changed }
}

/// Checks if this channel contains a message that this receiver has not yet
Expand Down Expand Up @@ -749,9 +789,11 @@ impl<T> Sender<T> {
/// ```
pub fn borrow(&self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
// The sender/producer always sees the current value
let updated = false;
Ref { inner, updated }

// The sender/producer always sees the current version
let has_changed = false;

Ref { inner, has_changed }
}

/// Checks if the channel has been closed. This happens when all receivers
Expand Down

0 comments on commit b4fef98

Please sign in to comment.