From 5833d3f7c8016ecbaec51a44d47227f96bca8dfd Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Thu, 4 Mar 2021 04:11:17 -0800 Subject: [PATCH 1/3] tokio-stream: WatchStream: yield the current value This udptes WatchStream to always yield the current value before waiting for updates. This makes it possible to use it as a drop-in replacement for watch::Receiver when updating a Tokio 0.2 application. There is one caveat here, which is that we can't tell if the sender was used before. This means we might yield the current value twice if it was (because reading what we think is the initial value doesn't increment the watcher). I don't think there really is a way to fix this without changing the API of Receiver, so for now I went with documenting it. Let me know what you think. More context: https://github.com/tokio-rs/tokio/discussions/3575 --- tokio-stream/src/wrappers/watch.rs | 58 +++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 5 deletions(-) diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index a98a72cfd56..641133c4551 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -10,25 +10,74 @@ use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// +/// This stream will always start by yielding the value present the Receiver when the WatchStream +/// is constructed. As such, you are advised to construct the WatchStream before using the Sender. +/// If you don't, you may receive the current value twice. +/// +/// # Examples +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// use tokio::sync::watch; +/// +/// let (tx, rx) = watch::channel("hello"); +/// +/// let mut rx = WatchStream::new(rx); +/// tx.send("goodbye").unwrap(); +/// +/// assert_eq!(rx.next().await, Some("hello")); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// use tokio::sync::watch; +/// +/// let (tx, rx) = watch::channel("hello"); +/// +/// // NOT RECOMMENDED! +/// tx.send("goodbye").unwrap(); +/// let mut rx = WatchStream::new(rx); +/// +/// tokio::task::spawn(async move { +/// tokio::time::sleep(std::time::Duration::from_millis(10)).await; +/// tx.send("hello again").unwrap(); +/// }); +/// +/// // goodbye will be received twice +/// assert_eq!(rx.next().await, Some("goodbye")); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// assert_eq!(rx.next().await, Some("hello again")); +/// # } +/// ``` +/// /// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver /// [`Stream`]: trait@crate::Stream #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] pub struct WatchStream { - inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver)>, + inner: ReusableBoxFuture<(Result, Receiver)>, } async fn make_future( mut rx: Receiver, -) -> (Result<(), RecvError>, Receiver) { +) -> (Result, Receiver) { let result = rx.changed().await; + let result = result.map(|()| (*rx.borrow()).clone()); (result, rx) } impl WatchStream { /// Create a new `WatchStream`. pub fn new(rx: Receiver) -> Self { + let initial = (*rx.borrow()).clone(); + Self { - inner: ReusableBoxFuture::new(make_future(rx)), + inner: ReusableBoxFuture::new(async move { (Ok(initial), rx) }), } } } @@ -39,8 +88,7 @@ impl Stream for WatchStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let (result, rx) = ready!(self.inner.poll(cx)); match result { - Ok(_) => { - let received = (*rx.borrow()).clone(); + Ok(received) => { self.inner.set(make_future(rx)); Poll::Ready(Some(received)) } From 80a1314720308737b2dfd028935c2be19ebf341d Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Thu, 4 Mar 2021 09:11:19 -0800 Subject: [PATCH 2/3] dont necessarily return the initial value --- tokio-stream/src/wrappers/watch.rs | 31 +++++++++--------------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index 641133c4551..c3109d33cc3 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -10,9 +10,8 @@ use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// -/// This stream will always start by yielding the value present the Receiver when the WatchStream -/// is constructed. As such, you are advised to construct the WatchStream before using the Sender. -/// If you don't, you may receive the current value twice. +/// This stream will always start by yielding the current value when the WatchStream is polled, +/// regardles of whether it was the initial value or sent afterwards. /// /// # Examples /// @@ -23,11 +22,11 @@ use tokio::sync::watch::error::RecvError; /// use tokio::sync::watch; /// /// let (tx, rx) = watch::channel("hello"); -/// /// let mut rx = WatchStream::new(rx); -/// tx.send("goodbye").unwrap(); /// /// assert_eq!(rx.next().await, Some("hello")); +/// +/// tx.send("goodbye").unwrap(); /// assert_eq!(rx.next().await, Some("goodbye")); /// # } /// ``` @@ -39,20 +38,10 @@ use tokio::sync::watch::error::RecvError; /// use tokio::sync::watch; /// /// let (tx, rx) = watch::channel("hello"); -/// -/// // NOT RECOMMENDED! /// tx.send("goodbye").unwrap(); /// let mut rx = WatchStream::new(rx); /// -/// tokio::task::spawn(async move { -/// tokio::time::sleep(std::time::Duration::from_millis(10)).await; -/// tx.send("hello again").unwrap(); -/// }); -/// -/// // goodbye will be received twice /// assert_eq!(rx.next().await, Some("goodbye")); -/// assert_eq!(rx.next().await, Some("goodbye")); -/// assert_eq!(rx.next().await, Some("hello again")); /// # } /// ``` /// @@ -60,24 +49,21 @@ use tokio::sync::watch::error::RecvError; /// [`Stream`]: trait@crate::Stream #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] pub struct WatchStream { - inner: ReusableBoxFuture<(Result, Receiver)>, + inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver)>, } async fn make_future( mut rx: Receiver, -) -> (Result, Receiver) { +) -> (Result<(), RecvError>, Receiver) { let result = rx.changed().await; - let result = result.map(|()| (*rx.borrow()).clone()); (result, rx) } impl WatchStream { /// Create a new `WatchStream`. pub fn new(rx: Receiver) -> Self { - let initial = (*rx.borrow()).clone(); - Self { - inner: ReusableBoxFuture::new(async move { (Ok(initial), rx) }), + inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), } } } @@ -88,7 +74,8 @@ impl Stream for WatchStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let (result, rx) = ready!(self.inner.poll(cx)); match result { - Ok(received) => { + Ok(_) => { + let received = (*rx.borrow()).clone(); self.inner.set(make_future(rx)); Poll::Ready(Some(received)) } From 62f2de44ce5d7426b215005833d8c468bdb5912d Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Thu, 4 Mar 2021 09:18:57 -0800 Subject: [PATCH 3/3] swap lines --- tokio-stream/src/wrappers/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index c3109d33cc3..0ffd1b8193b 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -38,9 +38,9 @@ use tokio::sync::watch::error::RecvError; /// use tokio::sync::watch; /// /// let (tx, rx) = watch::channel("hello"); -/// tx.send("goodbye").unwrap(); /// let mut rx = WatchStream::new(rx); /// +/// tx.send("goodbye").unwrap(); /// assert_eq!(rx.next().await, Some("goodbye")); /// # } /// ```