diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 184ba4b6041..82bd40b01ea 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -335,12 +335,11 @@ impl Receiver { Ref { inner } } - /// Returns a reference to the most recently sent value and mark that value + /// Returns a reference to the most recently sent value and marks that value /// as seen. /// - /// This method marks the value as seen, so [`changed`] will not return - /// immediately if the newest value is one previously returned by - /// `borrow_and_update`. + /// This method marks the current value as seen. Subsequent calls to [`changed()`] + /// will not return immediately until the producer has modified the shared value again. /// /// Outstanding borrows hold a read lock. This means that long lived borrows /// could cause the send half to block. It is recommended to keep the borrow @@ -365,9 +364,52 @@ impl Receiver { /// /// [`changed`]: Receiver::changed pub fn borrow_and_update(&mut self) -> Ref<'_, T> { + self.borrow_and_update_if_changed().0 + } + + /// Returns a reference to the most recently sent value with an *updated* + /// flag indicating if that value has been detected as *changed* and marked + /// as seen. + /// + /// This method marks the current value as seen. Subsequent calls to [`changed()`] + /// will not return immediately until the producer has modified the shared value again. + /// + /// Outstanding borrows hold a read lock. This means that long lived borrows + /// could cause the send half to block. It is recommended to keep the borrow + /// as short lived as possible. + /// + /// The priority policy of the lock is dependent on the underlying lock + /// implementation, and this type does not guarantee that any particular policy + /// will be used. In particular, a producer which is waiting to acquire the lock + /// in `send` might or might not block concurrent calls to `borrow`, e.g.: + /// + ///
Potential deadlock example + /// + /// ```text + /// // Task 1 (on thread A) | // Task 2 (on thread B) + /// let (_ref1, _updated1) = | + /// rx1.borrow_and_update_if_changed(); | + /// | // will block + /// | let _ = tx.send(()); + /// // may deadlock | + /// let (_ref2, _updated2) = | + /// rx2.borrow_and_update_if_changed(); | + /// ``` + ///
+ /// + /// [`changed`]: Receiver::changed + pub fn borrow_and_update_if_changed(&mut self) -> (Ref<'_, T>, bool) { let inner = self.shared.value.read().unwrap(); - self.version = self.shared.state.load().version(); - Ref { inner } + // After obtaining a read-lock no concurrent writes could occur + // and the loaded version matches that of the borrowed reference. + let new_version = self.shared.state.load().version(); + let updated = if self.version != new_version { + self.version = new_version; + true + } else { + false + }; + (Ref { inner }, updated) } /// Checks if this channel contains a message that this receiver has not yet