Skip to content

Commit

Permalink
sync: yield initial value in WatchStream (#3576)
Browse files Browse the repository at this point in the history
  • Loading branch information
krallin committed Mar 5, 2021
1 parent e06b257 commit 47be928
Showing 1 changed file with 36 additions and 1 deletion.
37 changes: 36 additions & 1 deletion tokio-stream/src/wrappers/watch.rs
Expand Up @@ -10,6 +10,41 @@ use tokio::sync::watch::error::RecvError;

/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
///
/// This stream will always start by yielding the current value when the WatchStream is polled,
/// regardles of whether it was the initial value or sent afterwards.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{StreamExt, wrappers::WatchStream};
/// use tokio::sync::watch;
///
/// let (tx, rx) = watch::channel("hello");
/// let mut rx = WatchStream::new(rx);
///
/// assert_eq!(rx.next().await, Some("hello"));
///
/// tx.send("goodbye").unwrap();
/// assert_eq!(rx.next().await, Some("goodbye"));
/// # }
/// ```
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{StreamExt, wrappers::WatchStream};
/// use tokio::sync::watch;
///
/// let (tx, rx) = watch::channel("hello");
/// let mut rx = WatchStream::new(rx);
///
/// tx.send("goodbye").unwrap();
/// assert_eq!(rx.next().await, Some("goodbye"));
/// # }
/// ```
///
/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
Expand All @@ -28,7 +63,7 @@ impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> {
/// Create a new `WatchStream`.
pub fn new(rx: Receiver<T>) -> Self {
Self {
inner: ReusableBoxFuture::new(make_future(rx)),
inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
}
}
}
Expand Down

1 comment on commit 47be928

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'sync_mpsc'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 47be928 Previous: e06b257 Ratio
send_large 145287 ns/iter (± 15065) 68280 ns/iter (± 2640) 2.13

This comment was automatically generated by workflow using github-action-benchmark.

CC: @tokio-rs/maintainers

Please sign in to comment.