From 5ef2db6d1421377062833fb3ed21b688a57a2283 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Wed, 26 May 2021 10:11:38 +0200 Subject: [PATCH 1/2] sync: add `Receiver::borrow_and_update` --- tokio/src/sync/watch.rs | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 42d417a0fe5..2e2b4f71a56 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -208,7 +208,7 @@ impl Receiver { Self { shared, version } } - /// Returns a reference to the most recently sent value + /// Returns a reference to the most recently sent value. /// /// Outstanding borrows hold a read lock. This means that long lived borrows /// could cause the send half to block. It is recommended to keep the borrow @@ -227,6 +227,23 @@ impl Receiver { Ref { inner } } + /// Returns a reference to the most recently sent value and mark that value + /// as seen. + /// + /// Any future calls to [`changed`] on this receiver will return immediately + /// if no new messages have been sent since `borrow_and_update` was called. + /// + /// Outstanding borrows hold a read lock. This means that long lived borrows + /// could cause the send half to block. It is recommended to keep the borrow + /// as short lived as possible. + /// + /// [`changed`]: Receiver::changed + pub fn borrow_and_update(&mut self) -> Ref<'_, T> { + let inner = self.shared.value.read().unwrap(); + self.version = self.shared.version.load(SeqCst); + Ref { inner } + } + /// Wait for a change notification /// /// Returns when a new value has been sent by the [`Sender`] since the last @@ -328,10 +345,21 @@ impl Sender { return Err(error::SendError { inner: value }); } - *self.shared.value.write().unwrap() = value; + { + // Acquire the write lock and update the value. + let mut lock = self.shared.value.write().unwrap(); + *lock = value; + + // Update the version. 2 is used so that the CLOSED bit is not set. + self.shared.version.fetch_add(2, SeqCst); - // Update the version. 2 is used so that the CLOSED bit is not set. - self.shared.version.fetch_add(2, SeqCst); + // Release the write lock. + // + // Incrementing the version counter while holding the lock ensures + // that receivers are able to figure out the version number of the + // value they are currently looking at. + drop(lock); + } // Notify all watchers self.shared.notify_rx.notify_waiters(); From 647674de8492becc933a9668fe4d39f6c27c0370 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Wed, 16 Jun 2021 12:34:59 +0200 Subject: [PATCH 2/2] update documentation --- tokio/src/sync/watch.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 2e2b4f71a56..06bda17addf 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -210,10 +210,16 @@ impl Receiver { /// Returns a reference to the most recently sent value. /// + /// This method does not mark the returned value as seen, so future calls to + /// [`changed`] may return immediately even if you have already seen the + /// value with a call to `borrow`. + /// /// Outstanding borrows hold a read lock. This means that long lived borrows /// could cause the send half to block. It is recommended to keep the borrow /// as short lived as possible. /// + /// [`changed`]: Receiver::changed + /// /// # Examples /// /// ``` @@ -230,8 +236,9 @@ impl Receiver { /// Returns a reference to the most recently sent value and mark that value /// as seen. /// - /// Any future calls to [`changed`] on this receiver will return immediately - /// if no new messages have been sent since `borrow_and_update` was called. + /// This method marks the value as seen, so [`changed`] will not return + /// immediately if the newest value is one previously returned by + /// `borrow_and_update`. /// /// Outstanding borrows hold a read lock. This means that long lived borrows /// could cause the send half to block. It is recommended to keep the borrow @@ -244,11 +251,15 @@ impl Receiver { Ref { inner } } - /// Wait for a change notification + /// Wait for a change notification, then mark the newest value as seen. + /// + /// If the newest value in the channel has not yet been marked seen when + /// this method is called, the method marks that value seen and returns + /// immediately. If the newest value has already been marked seen, then the + /// method sleeps until a new message is sent by the [`Sender`] connected to + /// this `Receiver`, or until the [`Sender`] is dropped. /// - /// Returns when a new value has been sent by the [`Sender`] since the last - /// time `changed()` was called. When the `Sender` half is dropped, `Err` is - /// returned. + /// This method returns an error if and only if the [`Sender`] is dropped. /// /// [`Sender`]: struct@Sender ///