Skip to content

Commit

Permalink
sync: add Receiver::borrow_and_update (#3813)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Jun 16, 2021
1 parent e979ad7 commit d1aa2df
Showing 1 changed file with 47 additions and 8 deletions.
55 changes: 47 additions & 8 deletions tokio/src/sync/watch.rs
Expand Up @@ -208,12 +208,18 @@ impl<T> Receiver<T> {
Self { shared, version }
}

/// Returns a reference to the most recently sent value
/// Returns a reference to the most recently sent value.
///
/// This method does not mark the returned value as seen, so future calls to
/// [`changed`] may return immediately even if you have already seen the
/// value with a call to `borrow`.
///
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
/// as short lived as possible.
///
/// [`changed`]: Receiver::changed
///
/// # Examples
///
/// ```
Expand All @@ -227,11 +233,33 @@ impl<T> Receiver<T> {
Ref { inner }
}

/// Wait for a change notification
/// Returns a reference to the most recently sent value and mark that value
/// as seen.
///
/// This method marks the value as seen, so [`changed`] will not return
/// immediately if the newest value is one previously returned by
/// `borrow_and_update`.
///
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
/// as short lived as possible.
///
/// [`changed`]: Receiver::changed
pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
self.version = self.shared.version.load(SeqCst);
Ref { inner }
}

/// Wait for a change notification, then mark the newest value as seen.
///
/// If the newest value in the channel has not yet been marked seen when
/// this method is called, the method marks that value seen and returns
/// immediately. If the newest value has already been marked seen, then the
/// method sleeps until a new message is sent by the [`Sender`] connected to
/// this `Receiver`, or until the [`Sender`] is dropped.
///
/// Returns when a new value has been sent by the [`Sender`] since the last
/// time `changed()` was called. When the `Sender` half is dropped, `Err` is
/// returned.
/// This method returns an error if and only if the [`Sender`] is dropped.
///
/// [`Sender`]: struct@Sender
///
Expand Down Expand Up @@ -328,10 +356,21 @@ impl<T> Sender<T> {
return Err(error::SendError { inner: value });
}

*self.shared.value.write().unwrap() = value;
{
// Acquire the write lock and update the value.
let mut lock = self.shared.value.write().unwrap();
*lock = value;

// Update the version. 2 is used so that the CLOSED bit is not set.
self.shared.version.fetch_add(2, SeqCst);
// Update the version. 2 is used so that the CLOSED bit is not set.
self.shared.version.fetch_add(2, SeqCst);

// Release the write lock.
//
// Incrementing the version counter while holding the lock ensures
// that receivers are able to figure out the version number of the
// value they are currently looking at.
drop(lock);
}

// Notify all watchers
self.shared.notify_rx.notify_waiters();
Expand Down

0 comments on commit d1aa2df

Please sign in to comment.