diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index a98a72cfd56..0ffd1b8193b 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -10,6 +10,41 @@ use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// +/// This stream will always start by yielding the current value when the WatchStream is polled, +/// regardles of whether it was the initial value or sent afterwards. +/// +/// # Examples +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// use tokio::sync::watch; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::new(rx); +/// +/// assert_eq!(rx.next().await, Some("hello")); +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// use tokio::sync::watch; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::new(rx); +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// /// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver /// [`Stream`]: trait@crate::Stream #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] @@ -28,7 +63,7 @@ impl WatchStream { /// Create a new `WatchStream`. pub fn new(rx: Receiver) -> Self { Self { - inner: ReusableBoxFuture::new(make_future(rx)), + inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), } } }