Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: add Receiver::borrow_and_update #3813

Merged
merged 2 commits into from Jun 16, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 47 additions & 8 deletions tokio/src/sync/watch.rs
Expand Up @@ -208,12 +208,18 @@ impl<T> Receiver<T> {
Self { shared, version }
}

/// Returns a reference to the most recently sent value
/// Returns a reference to the most recently sent value.
///
/// This method does not mark the returned value as seen, so future calls to
/// [`changed`] may return immediately even if you have already seen the
/// value with a call to `borrow`.
///
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
/// as short lived as possible.
///
/// [`changed`]: Receiver::changed
///
/// # Examples
///
/// ```
Expand All @@ -227,11 +233,33 @@ impl<T> Receiver<T> {
Ref { inner }
}

/// Wait for a change notification
/// Returns a reference to the most recently sent value and mark that value
/// as seen.
///
/// This method marks the value as seen, so [`changed`] will not return
/// immediately if the newest value is one previously returned by
/// `borrow_and_update`.
///
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
/// as short lived as possible.
///
/// [`changed`]: Receiver::changed
pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
self.version = self.shared.version.load(SeqCst);
Ref { inner }
}

/// Wait for a change notification, then mark the newest value as seen.
///
/// If the newest value in the channel has not yet been marked seen when
/// this method is called, the method marks that value seen and returns
/// immediately. If the newest value has already been marked seen, then the
/// method sleeps until a new message is sent by the [`Sender`] connected to
/// this `Receiver`, or until the [`Sender`] is dropped.
///
/// Returns when a new value has been sent by the [`Sender`] since the last
/// time `changed()` was called. When the `Sender` half is dropped, `Err` is
/// returned.
/// This method returns an error if and only if the [`Sender`] is dropped.
///
/// [`Sender`]: struct@Sender
///
Expand Down Expand Up @@ -328,10 +356,21 @@ impl<T> Sender<T> {
return Err(error::SendError { inner: value });
}

*self.shared.value.write().unwrap() = value;
{
// Acquire the write lock and update the value.
let mut lock = self.shared.value.write().unwrap();
*lock = value;

// Update the version. 2 is used so that the CLOSED bit is not set.
self.shared.version.fetch_add(2, SeqCst);
// Update the version. 2 is used so that the CLOSED bit is not set.
self.shared.version.fetch_add(2, SeqCst);

// Release the write lock.
//
// Incrementing the version counter while holding the lock ensures
// that receivers are able to figure out the version number of the
// value they are currently looking at.
drop(lock);
}

// Notify all watchers
self.shared.notify_rx.notify_waiters();
Expand Down