diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 42d417a0fe5..06bda17addf 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -208,12 +208,18 @@ impl Receiver { Self { shared, version } } - /// Returns a reference to the most recently sent value + /// Returns a reference to the most recently sent value. + /// + /// This method does not mark the returned value as seen, so future calls to + /// [`changed`] may return immediately even if you have already seen the + /// value with a call to `borrow`. /// /// Outstanding borrows hold a read lock. This means that long lived borrows /// could cause the send half to block. It is recommended to keep the borrow /// as short lived as possible. /// + /// [`changed`]: Receiver::changed + /// /// # Examples /// /// ``` @@ -227,11 +233,33 @@ impl Receiver { Ref { inner } } - /// Wait for a change notification + /// Returns a reference to the most recently sent value and mark that value + /// as seen. + /// + /// This method marks the value as seen, so [`changed`] will not return + /// immediately if the newest value is one previously returned by + /// `borrow_and_update`. + /// + /// Outstanding borrows hold a read lock. This means that long lived borrows + /// could cause the send half to block. It is recommended to keep the borrow + /// as short lived as possible. + /// + /// [`changed`]: Receiver::changed + pub fn borrow_and_update(&mut self) -> Ref<'_, T> { + let inner = self.shared.value.read().unwrap(); + self.version = self.shared.version.load(SeqCst); + Ref { inner } + } + + /// Wait for a change notification, then mark the newest value as seen. + /// + /// If the newest value in the channel has not yet been marked seen when + /// this method is called, the method marks that value seen and returns + /// immediately. If the newest value has already been marked seen, then the + /// method sleeps until a new message is sent by the [`Sender`] connected to + /// this `Receiver`, or until the [`Sender`] is dropped. /// - /// Returns when a new value has been sent by the [`Sender`] since the last - /// time `changed()` was called. When the `Sender` half is dropped, `Err` is - /// returned. + /// This method returns an error if and only if the [`Sender`] is dropped. /// /// [`Sender`]: struct@Sender /// @@ -328,10 +356,21 @@ impl Sender { return Err(error::SendError { inner: value }); } - *self.shared.value.write().unwrap() = value; + { + // Acquire the write lock and update the value. + let mut lock = self.shared.value.write().unwrap(); + *lock = value; - // Update the version. 2 is used so that the CLOSED bit is not set. - self.shared.version.fetch_add(2, SeqCst); + // Update the version. 2 is used so that the CLOSED bit is not set. + self.shared.version.fetch_add(2, SeqCst); + + // Release the write lock. + // + // Incrementing the version counter while holding the lock ensures + // that receivers are able to figure out the version number of the + // value they are currently looking at. + drop(lock); + } // Notify all watchers self.shared.notify_rx.notify_waiters();