Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync::watch: Add has_changed flag and method to Ref #4758

Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
84 changes: 76 additions & 8 deletions tokio/src/sync/watch.rs
Expand Up @@ -112,6 +112,56 @@ pub struct Sender<T> {
#[derive(Debug)]
pub struct Ref<'a, T> {
inner: RwLockReadGuard<'a, T>,
has_changed: bool,
}

impl<'a, T> Ref<'a, T> {
/// Indicates if the borrowed value is considered as _changed_ since the last
/// time it has been marked as seen.
///
/// Other than [`Receiver::has_changed()`] it does not fail if the [`Receiver`]
/// is orphaned after the [`Sender`] has been dropped.
uklotzde marked this conversation as resolved.
Show resolved Hide resolved
///
/// When borrowed from the [`Sender`] this function will always return `false`.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = watch::channel("hello");
///
/// tx.send("goodbye").unwrap();
/// // The sender does never consider the value as changed.
/// assert!(!tx.borrow().has_changed());
///
/// // Drop the sender immediately, just for testing purposes.
/// drop(tx);
///
/// // Even if the sender has already been dropped...
/// assert!(rx.has_changed().is_err());
/// // ...the modified value is still readable and detected as changed.
/// assert_eq!(*rx.borrow(), "goodbye");
/// assert!(rx.borrow().has_changed());
///
/// // Read the changed value and mark it as seen.
/// {
/// let received = rx.borrow_and_update();
/// assert_eq!(*received, "goodbye");
/// assert!(received.has_changed());
/// // Release the read lock when leaving this scope.
/// }
///
/// // Now the value has already been marked as seen and could
/// // never be modified again (after the sender has been dropped).
/// assert!(!rx.borrow().has_changed());
/// }
/// ```
pub fn has_changed(&self) -> bool {
self.has_changed
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -332,15 +382,21 @@ impl<T> Receiver<T> {
/// ```
pub fn borrow(&self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
Ref { inner }

// After obtaining a read-lock no concurrent writes could occur
// and the loaded version matches that of the borrowed reference.
let new_version = self.shared.state.load().version();
let has_changed = self.version != new_version;

Ref { inner, has_changed }
}

/// Returns a reference to the most recently sent value and mark that value
/// Returns a reference to the most recently sent value and marks that value
/// as seen.
///
/// This method marks the value as seen, so [`changed`] will not return
/// immediately if the newest value is one previously returned by
/// `borrow_and_update`.
/// This method marks the current value as seen. Subsequent calls to [`changed`]
/// will not return immediately until the [`Sender`] has modified the shared
/// value again.
///
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
Expand All @@ -366,8 +422,16 @@ impl<T> Receiver<T> {
/// [`changed`]: Receiver::changed
pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
self.version = self.shared.state.load().version();
Ref { inner }

// After obtaining a read-lock no concurrent writes could occur
// and the loaded version matches that of the borrowed reference.
let new_version = self.shared.state.load().version();
let has_changed = self.version != new_version;

// Mark the shared value as seen by updating the version
self.version = new_version;

Ref { inner, has_changed }
}

/// Checks if this channel contains a message that this receiver has not yet
Expand Down Expand Up @@ -725,7 +789,11 @@ impl<T> Sender<T> {
/// ```
pub fn borrow(&self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
Ref { inner }

// The sender/producer always sees the current version
let has_changed = false;

Ref { inner, has_changed }
}

/// Checks if the channel has been closed. This happens when all receivers
Expand Down