Skip to content

Commit

Permalink
sync::watch::Receiver: Add wait_for_changed() method
Browse files Browse the repository at this point in the history
  • Loading branch information
uklotzde committed Sep 24, 2023
1 parent 02aacf5 commit c8e9d19
Showing 1 changed file with 87 additions and 0 deletions.
87 changes: 87 additions & 0 deletions tokio/src/sync/watch.rs
Expand Up @@ -786,6 +786,93 @@ impl<T> Receiver<T> {
}
}

/// Wait for a changed value that satisfies the provided filter condition
///
/// Each value will be received and fed to `filter_fn` at most once. Values
/// might be skipped if they are sent faster than the receiver can process them.
///
/// This method works as [`wait_for()`](Self::wait_for) but it will only
/// consider _changed_ values. It could be called repeatedly in a loop while
/// preserving the _at-most-once_ delivery guarantee. The current value
/// could be included even if it has already been seen by calling
/// [`mark_changed()`](Self::mark_changed) beforehand.
///
/// ```
/// use tokio::sync::watch;
/// use tokio::time::timeout;
///
/// const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1);
///
/// #[tokio::main]
/// async fn main() {
/// let tx = watch::Sender::new("hello");
///
/// let mut rx = tx.subscribe();
///
/// // After the subscribing the current value is not marked as changed.
/// assert!(!rx.borrow().has_changed());
/// assert!(timeout(TIMEOUT, rx.wait_for_changed(|_| true)).await.is_err());
///
/// // Mark the current value as changed.
/// rx.mark_changed();
/// assert_eq!("hello", *rx.wait_for_changed(|val| *val == "hello").await.unwrap());
///
/// // Values that don't satisfy the filter are skipped.
/// tx.send("hello again").unwrap();
/// assert!(timeout(TIMEOUT, rx.wait_for_changed(|val| *val == "hello")).await.is_err());
///
/// // Send a final value and close the channel.
/// tx.send("goodbye").unwrap();
/// drop(tx);
///
/// // The final value is received if not seen before, even if the channel is closed.
/// assert!(rx.has_changed().is_err());
/// assert_eq!("goodbye", *rx.borrow());
/// assert_eq!("goodbye", *rx.wait_for(|val| *val == "goodbye").await.unwrap());
///
/// // All subsequent invocations return immediately with an error.
/// assert!(rx.wait_for_changed(|_| true).await.is_err());
/// assert!(rx.wait_for_changed(|_| false).await.is_err());
/// }
/// ```
pub async fn wait_for_changed(
&mut self,
mut filter_fn: impl FnMut(&T) -> bool,
) -> Result<Ref<'_, T>, error::RecvError> {
loop {
// Wait for the value to change.
changed_impl(&self.shared, &mut self.version).await?;

let inner = self.shared.value.read().unwrap();

// The version has already been updated by `changed_impl`. But it
// needs to be updated again after acquiring the read-lock to ensure
// that the current value matches the version. Otherwise the same
// value could be returned twice, violating the at-most-once guarantee.
let new_version = self.shared.state.load().version();
self.version = new_version;

// Filter the value and catch a possible panic inside the provided closure.
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| filter_fn(&inner)));
match result {
Ok(true) => {
return Ok(Ref {
inner,
has_changed: true, // Always changed
});
}
Ok(false) => (), // Continue waiting
Err(panicked) => {
// Drop the lock to avoid poisoning it.
drop(inner);
// Forward the panic to the caller.
panic::resume_unwind(panicked);
// Unreachable
}
};
}
}

/// Returns `true` if receivers belong to the same channel.
///
/// # Examples
Expand Down

0 comments on commit c8e9d19

Please sign in to comment.