Skip to content

Commit

Permalink
sync::watch: Add Receiver::borrow_and_update_if_changed()
Browse files Browse the repository at this point in the history
This function could otherwise not be implemented in terms of the public
API, because invoking has_changed() and borrow_and_update() independently
is not thread-safe.
  • Loading branch information
uklotzde committed Jun 8, 2022
1 parent 340c4dc commit 6445c44
Showing 1 changed file with 48 additions and 6 deletions.
54 changes: 48 additions & 6 deletions tokio/src/sync/watch.rs
Expand Up @@ -335,12 +335,11 @@ impl<T> Receiver<T> {
Ref { inner }
}

/// Returns a reference to the most recently sent value and mark that value
/// Returns a reference to the most recently sent value and marks that value
/// as seen.
///
/// This method marks the value as seen, so [`changed`] will not return
/// immediately if the newest value is one previously returned by
/// `borrow_and_update`.
/// This method marks the current value as seen. Subsequent calls to [`changed`]
/// will not return immediately until the producer has modified the shared value again.
///
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
Expand All @@ -365,9 +364,52 @@ impl<T> Receiver<T> {
///
/// [`changed`]: Receiver::changed
pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
self.borrow_and_update_if_changed().0
}

/// Returns a reference to the most recently sent value with an *updated*
/// flag indicating if that value has been detected as *changed* and marked
/// as seen.
///
/// This method marks the current value as seen. Subsequent calls to [`changed`]
/// will not return immediately until the producer has modified the shared value again.
///
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
/// as short lived as possible.
///
/// The priority policy of the lock is dependent on the underlying lock
/// implementation, and this type does not guarantee that any particular policy
/// will be used. In particular, a producer which is waiting to acquire the lock
/// in `send` might or might not block concurrent calls to `borrow`, e.g.:
///
/// <details><summary>Potential deadlock example</summary>
///
/// ```text
/// // Task 1 (on thread A) | // Task 2 (on thread B)
/// let (_ref1, _updated1) = |
/// rx1.borrow_and_update_if_changed(); |
/// | // will block
/// | let _ = tx.send(());
/// // may deadlock |
/// let (_ref2, _updated2) = |
/// rx2.borrow_and_update_if_changed(); |
/// ```
/// </details>
///
/// [`changed`]: Receiver::changed
pub fn borrow_and_update_if_changed(&mut self) -> (Ref<'_, T>, bool) {
let inner = self.shared.value.read().unwrap();
self.version = self.shared.state.load().version();
Ref { inner }
// After obtaining a read-lock no concurrent writes could occur
// and the loaded version matches that of the borrowed reference.
let new_version = self.shared.state.load().version();
let updated = if self.version != new_version {
self.version = new_version;
true
} else {
false
};
(Ref { inner }, updated)
}

/// Checks if this channel contains a message that this receiver has not yet
Expand Down

0 comments on commit 6445c44

Please sign in to comment.