diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs index 23713251e9d..fddb3a5b794 100644 --- a/tokio/src/sync/barrier.rs +++ b/tokio/src/sync/barrier.rs @@ -110,9 +110,11 @@ impl Barrier { let mut wait = self.wait.clone(); loop { + let _ = wait.changed().await; + // note that the first time through the loop, this _will_ yield a generation // immediately, since we cloned a receiver that has never seen any values. - if wait.recv().await >= generation { + if *wait.borrow() >= generation { break; } } diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 5d66512d5ad..d5b07f77584 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -355,10 +355,8 @@ //! let op = my_async_operation(); //! tokio::pin!(op); //! -//! // Receive the **initial** configuration value. As this is the -//! // first time the config is received from the watch, it will -//! // always complete immediatedly. -//! let mut conf = rx.recv().await; +//! // Get the initial config value +//! let mut conf = rx.borrow().clone(); //! //! let mut op_start = Instant::now(); //! let mut delay = time::delay_until(op_start + conf.timeout); @@ -375,8 +373,8 @@ //! // Restart the timeout //! delay = time::delay_until(op_start + conf.timeout); //! } -//! new_conf = rx.recv() => { -//! conf = new_conf; +//! _ = rx.changed() => { +//! conf = rx.borrow().clone(); //! //! // The configuration has been updated. Update the //! // `delay` using the new `timeout` value. diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 84a94823653..1410b08542c 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -121,7 +121,7 @@ struct Waiter { /// Future returned from `notified()` #[derive(Debug)] -struct Notified<'a> { +pub struct Notified<'a> { /// The `Notify` being received on. notify: &'a Notify, @@ -170,6 +170,12 @@ impl Notify { /// Wait for a notification. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn notified(&self); + /// ``` + /// /// Each `Notify` value holds a single permit. If a permit is available from /// an earlier call to [`notify()`], then `notified().await` will complete /// immediately, consuming that permit. Otherwise, `notified().await` waits @@ -197,7 +203,7 @@ impl Notify { /// notify.notify(); /// } /// ``` - pub async fn notified(&self) { + pub fn notified(&self) -> Notified<'_> { Notified { notify: self, state: State::Init, @@ -208,7 +214,6 @@ impl Notify { _p: PhantomPinned, }), } - .await } /// Notifies a waiting task @@ -277,6 +282,45 @@ impl Notify { waker.wake(); } } + + /// Notifies all waiting tasks + pub(crate) fn notify_waiters(&self) { + // There are waiters, the lock must be acquired to notify. + let mut waiters = self.waiters.lock().unwrap(); + + // The state must be reloaded while the lock is held. The state may only + // transition out of WAITING while the lock is held. + let curr = self.state.load(SeqCst); + + if let EMPTY | NOTIFIED = curr { + // There are no waiting tasks. In this case, no synchronization is + // established between `notify` and `notified().await`. + return; + } + + // At this point, it is guaranteed that the state will not + // concurrently change, as holding the lock is required to + // transition **out** of `WAITING`. + // + // Get pending waiters + while let Some(mut waiter) = waiters.pop_back() { + // Safety: `waiters` lock is still held. + let waiter = unsafe { waiter.as_mut() }; + + assert!(!waiter.notified); + + waiter.notified = true; + + if let Some(waker) = waiter.waker.take() { + waker.wake(); + } + } + + // All waiters have been notified, the state must be transitioned to + // `EMPTY`. As transitioning **from** `WAITING` requires the lock to be + // held, a `store` is sufficient. + self.state.store(EMPTY, SeqCst); + } } impl Default for Notify { @@ -428,6 +472,8 @@ impl Future for Notified<'_> { waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); *state = Waiting; + + return Poll::Pending; } Waiting => { // Currently in the "Waiting" state, implying the caller has diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs new file mode 100644 index 00000000000..7944cab8d24 --- /dev/null +++ b/tokio/src/sync/tests/loom_watch.rs @@ -0,0 +1,20 @@ +use crate::sync::watch; + +use loom::future::block_on; +use loom::thread; + +#[test] +fn smoke() { + loom::model(|| { + let (tx, mut rx) = watch::channel(1); + + let th = thread::spawn(move || { + tx.send(2).unwrap(); + }); + + block_on(rx.changed()).unwrap(); + assert_eq!(*rx.borrow(), 2); + + th.join().unwrap(); + }) +} diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs index d571754c011..c637cb6b816 100644 --- a/tokio/src/sync/tests/mod.rs +++ b/tokio/src/sync/tests/mod.rs @@ -13,4 +13,5 @@ cfg_loom! { mod loom_oneshot; mod loom_semaphore_batch; mod loom_semaphore_ll; + mod loom_watch; } diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index f6660b6eae9..7d1ac9e8fdc 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -6,13 +6,11 @@ //! //! # Usage //! -//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are -//! the producer and sender halves of the channel. The channel is -//! created with an initial value. [`Receiver::recv`] will always -//! be ready upon creation and will yield either this initial value or -//! the latest value that has been sent by `Sender`. -//! -//! Calls to [`Receiver::recv`] will always yield the latest value. +//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer +//! and sender halves of the channel. The channel is created with an initial +//! value. The **latest** value stored in the channel is accessed with +//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new +//! value to sent by the [`Sender`] half. //! //! # Examples //! @@ -23,8 +21,8 @@ //! let (tx, mut rx) = watch::channel("hello"); //! //! tokio::spawn(async move { -//! while let Some(value) = Some(rx.recv().await) { -//! println!("received = {:?}", value); +//! while rx.changed().await.is_ok() { +//! println!("received = {:?}", *rx.borrow()); //! } //! }); //! @@ -47,20 +45,17 @@ //! //! [`Sender`]: crate::sync::watch::Sender //! [`Receiver`]: crate::sync::watch::Receiver -//! [`Receiver::recv`]: crate::sync::watch::Receiver::recv +//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed +//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow //! [`channel`]: crate::sync::watch::channel //! [`Sender::closed`]: crate::sync::watch::Sender::closed -use crate::future::poll_fn; -use crate::sync::task::AtomicWaker; +use crate::sync::Notify; -use fnv::FnvHashSet; use std::ops; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; -use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak}; -use std::task::Poll::{Pending, Ready}; -use std::task::{Context, Poll}; +use std::sync::{Arc, RwLock, RwLockReadGuard}; /// Receives values from the associated [`Sender`](struct@Sender). /// @@ -70,8 +65,8 @@ pub struct Receiver { /// Pointer to the shared state shared: Arc>, - /// Pointer to the watcher's internal state - inner: Watcher, + /// Last observed version + version: usize, } /// Sends values to the associated [`Receiver`](struct@Receiver). @@ -79,7 +74,7 @@ pub struct Receiver { /// Instances are created by the [`channel`](fn@channel) function. #[derive(Debug)] pub struct Sender { - shared: Weak>, + shared: Arc>, } /// Returns a reference to the inner value @@ -92,6 +87,27 @@ pub struct Ref<'a, T> { inner: RwLockReadGuard<'a, T>, } +#[derive(Debug)] +struct Shared { + /// The most recent value + value: RwLock, + + /// The current version + /// + /// The lowest bit represents a "closed" state. The rest of the bits + /// represent the current version. + version: AtomicUsize, + + /// Tracks the number of `Receiver` instances + ref_count_rx: AtomicUsize, + + /// Notifies waiting receivers that the value changed. + notify_rx: Notify, + + /// Notifies any task listening for `Receiver` dropped events + notify_tx: Notify, +} + pub mod error { //! Watch error types @@ -112,37 +128,20 @@ pub mod error { } impl std::error::Error for SendError {} -} - -#[derive(Debug)] -struct Shared { - /// The most recent value - value: RwLock, - - /// The current version - /// - /// The lowest bit represents a "closed" state. The rest of the bits - /// represent the current version. - version: AtomicUsize, - /// All watchers - watchers: Mutex, - - /// Task to notify when all watchers drop - cancel: AtomicWaker, -} + /// Error produced when receiving a change notification. + #[derive(Debug)] + pub struct RecvError(pub(super) ()); -type Watchers = FnvHashSet; + // ===== impl RecvError ===== -/// The watcher's ID is based on the Arc's pointer. -#[derive(Clone, Debug)] -struct Watcher(Arc); + impl fmt::Display for RecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } + } -#[derive(Debug)] -struct WatchInner { - /// Last observed version - version: AtomicUsize, - waker: AtomicWaker, + impl std::error::Error for RecvError {} } const CLOSED: usize = 1; @@ -162,8 +161,8 @@ const CLOSED: usize = 1; /// let (tx, mut rx) = watch::channel("hello"); /// /// tokio::spawn(async move { -/// while let Some(value) = Some(rx.recv().await) { -/// println!("received = {:?}", value); +/// while rx.changed().await.is_ok() { +/// println!("received = {:?}", *rx.borrow()); /// } /// }); /// @@ -174,29 +173,20 @@ const CLOSED: usize = 1; /// /// [`Sender`]: struct@Sender /// [`Receiver`]: struct@Receiver -pub fn channel(init: T) -> (Sender, Receiver) { - const VERSION_0: usize = 0; - const VERSION_1: usize = 2; - - // We don't start knowing VERSION_1 - let inner = Watcher::new_version(VERSION_0); - - // Insert the watcher - let mut watchers = FnvHashSet::with_capacity_and_hasher(0, Default::default()); - watchers.insert(inner.clone()); - +pub fn channel(init: T) -> (Sender, Receiver) { let shared = Arc::new(Shared { value: RwLock::new(init), - version: AtomicUsize::new(VERSION_1), - watchers: Mutex::new(watchers), - cancel: AtomicWaker::new(), + version: AtomicUsize::new(0), + ref_count_rx: AtomicUsize::new(1), + notify_rx: Notify::new(), + notify_tx: Notify::new(), }); let tx = Sender { - shared: Arc::downgrade(&shared), + shared: shared.clone(), }; - let rx = Receiver { shared, inner }; + let rx = Receiver { shared, version: 0 }; (tx, rx) } @@ -221,41 +211,13 @@ impl Receiver { Ref { inner } } - // TODO: document - #[doc(hidden)] - pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll> { - // Make sure the task is up to date - self.inner.waker.register_by_ref(cx.waker()); - - let state = self.shared.version.load(SeqCst); - let version = state & !CLOSED; - - if self.inner.version.swap(version, Relaxed) != version { - let inner = self.shared.value.read().unwrap(); - - return Ready(Ref { inner }); - } - - if CLOSED == state & CLOSED { - // The `Store` handle has been dropped. - let inner = self.shared.value.read().unwrap(); - - return Ready(Ref { inner }); - } - - Pending - } -} - -impl Receiver { - /// Attempts to clone the latest value sent via the channel. + /// Wait for a change notification /// - /// If this is the first time the function is called on a `Receiver` - /// instance, then the function completes immediately with the **current** - /// value held by the channel. On the next call, the function waits until - /// a new value is sent in the channel. + /// Returns when a new value has been sent by the [`Sender`] since the last + /// time `changed()` was called. When the `Sender` half is dropped, `Err` is + /// returned. /// - /// `None` is returned if the `Sender` half is dropped. + /// [`Sender`]: struct@Sender /// /// # Examples /// @@ -266,79 +228,110 @@ impl Receiver { /// async fn main() { /// let (tx, mut rx) = watch::channel("hello"); /// - /// let v = rx.recv().await; - /// assert_eq!(v, "hello"); - /// /// tokio::spawn(async move { /// tx.send("goodbye").unwrap(); /// }); /// - /// // Waits for the new task to spawn and send the value. - /// let v = rx.recv().await; - /// assert_eq!(v, "goodbye"); + /// assert!(rx.changed().await.is_ok()); + /// assert_eq!(*rx.borrow(), "goodbye"); /// - /// let v = rx.recv().await; - /// assert_eq!(v, "goodbye"); + /// // The `tx` handle has been dropped + /// assert!(rx.changed().await.is_err()); /// } /// ``` - pub async fn recv(&mut self) -> T { - poll_fn(|cx| { - let v_ref = ready!(self.poll_recv_ref(cx)); - Poll::Ready((*v_ref).clone()) + pub async fn changed(&mut self) -> Result<(), error::RecvError> { + use std::future::Future; + use std::pin::Pin; + use std::task::Poll; + + // In order to avoid a race condition, we first request a notification, + // **then** check the current value's version. If a new version exists, + // the notification request is dropped. Requesting the notification + // requires polling the future once. + let notified = self.shared.notify_rx.notified(); + pin!(notified); + + // Polling the future once is guaranteed to return `Pending` as `watch` + // only notifies using `notify_waiters`. + crate::future::poll_fn(|cx| { + let res = Pin::new(&mut notified).poll(cx); + assert!(!res.is_ready()); + Poll::Ready(()) }) - .await + .await; + + if let Some(ret) = maybe_changed(&self.shared, &mut self.version) { + return ret; + } + + notified.await; + + maybe_changed(&self.shared, &mut self.version) + .expect("[bug] failed to observe change after notificaton.") } } -#[cfg(feature = "stream")] -impl crate::stream::Stream for Receiver { - type Item = T; - - fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let v_ref = ready!(self.poll_recv_ref(cx)); +fn maybe_changed( + shared: &Shared, + version: &mut usize, +) -> Option> { + // Load the version from the state + let state = shared.version.load(SeqCst); + let new_version = state & !CLOSED; + + if *version != new_version { + // Observe the new version and return + *version = new_version; + return Some(Ok(())); + } - Poll::Ready(Some((*v_ref).clone())) + if CLOSED == state & CLOSED { + // All receivers have dropped. + return Some(Err(error::RecvError(()))); } + + None } impl Clone for Receiver { fn clone(&self) -> Self { - let ver = self.inner.version.load(Relaxed); - let inner = Watcher::new_version(ver); + let version = self.version; let shared = self.shared.clone(); - shared.watchers.lock().unwrap().insert(inner.clone()); + // No synchronization necessary as this is only used as a counter and + // not memory access. + shared.ref_count_rx.fetch_add(1, Relaxed); - Receiver { shared, inner } + Receiver { version, shared } } } impl Drop for Receiver { fn drop(&mut self) { - self.shared.watchers.lock().unwrap().remove(&self.inner); + // No synchronization necessary as this is only used as a counter and + // not memory access. + if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) { + // This is the last `Receiver` handle, tasks waiting on `Sender::closed()` + self.shared.notify_tx.notify_waiters(); + } } } impl Sender { /// Sends a new value via the channel, notifying all receivers. pub fn send(&self, value: T) -> Result<(), error::SendError> { - let shared = match self.shared.upgrade() { - Some(shared) => shared, - // All `Watch` handles have been canceled - None => return Err(error::SendError { inner: value }), - }; - - // Replace the value - { - let mut lock = shared.value.write().unwrap(); - *lock = value; + // This is pretty much only useful as a hint anyway, so synchronization isn't critical. + if 0 == self.shared.ref_count_rx.load(Relaxed) { + return Err(error::SendError { inner: value }); } + *self.shared.value.write().unwrap() = value; + // Update the version. 2 is used so that the CLOSED bit is not set. - shared.version.fetch_add(2, SeqCst); + self.shared.version.fetch_add(2, SeqCst); // Notify all watchers - notify_all(&*shared); + self.shared.notify_rx.notify_waiters(); Ok(()) } @@ -347,37 +340,42 @@ impl Sender { /// /// This allows the producer to get notified when interest in the produced /// values is canceled and immediately stop doing work. - pub async fn closed(&mut self) { - poll_fn(|cx| self.poll_close(cx)).await - } + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = watch::channel("hello"); + /// + /// tokio::spawn(async move { + /// // use `rx` + /// drop(rx); + /// }); + /// + /// // Waits for `rx` to drop + /// tx.closed().await; + /// println!("the `rx` handles dropped") + /// } + /// ``` + pub async fn closed(&self) { + let notified = self.shared.notify_tx.notified(); - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { - match self.shared.upgrade() { - Some(shared) => { - shared.cancel.register_by_ref(cx.waker()); - Pending - } - None => Ready(()), + if self.shared.ref_count_rx.load(Relaxed) == 0 { + return; } - } -} - -/// Notifies all watchers of a change -fn notify_all(shared: &Shared) { - let watchers = shared.watchers.lock().unwrap(); - for watcher in watchers.iter() { - // Notify the task - watcher.waker.wake(); + notified.await; + debug_assert_eq!(0, self.shared.ref_count_rx.load(Relaxed)); } } impl Drop for Sender { fn drop(&mut self) { - if let Some(shared) = self.shared.upgrade() { - shared.version.fetch_or(CLOSED, SeqCst); - notify_all(&*shared); - } + self.shared.version.fetch_or(CLOSED, SeqCst); + self.shared.notify_rx.notify_waiters(); } } @@ -390,44 +388,3 @@ impl ops::Deref for Ref<'_, T> { self.inner.deref() } } - -// ===== impl Shared ===== - -impl Drop for Shared { - fn drop(&mut self) { - self.cancel.wake(); - } -} - -// ===== impl Watcher ===== - -impl Watcher { - fn new_version(version: usize) -> Self { - Watcher(Arc::new(WatchInner { - version: AtomicUsize::new(version), - waker: AtomicWaker::new(), - })) - } -} - -impl std::cmp::PartialEq for Watcher { - fn eq(&self, other: &Watcher) -> bool { - Arc::ptr_eq(&self.0, &other.0) - } -} - -impl std::cmp::Eq for Watcher {} - -impl std::hash::Hash for Watcher { - fn hash(&self, state: &mut H) { - (&*self.0 as *const WatchInner).hash(state) - } -} - -impl std::ops::Deref for Watcher { - type Target = WatchInner; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index 45d11bd441a..eb33b70a770 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -206,7 +206,7 @@ async_assert_fn!(tokio::sync::Mutex>::lock(_): !Send & !Sync); async_assert_fn!(tokio::sync::Mutex::lock_owned(_): Send & Sync); async_assert_fn!(tokio::sync::Mutex>::lock_owned(_): Send & Sync); async_assert_fn!(tokio::sync::Mutex>::lock_owned(_): !Send & !Sync); -async_assert_fn!(tokio::sync::Notify::notified(_): Send & !Sync); +async_assert_fn!(tokio::sync::Notify::notified(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock::read(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock::write(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock>::read(_): !Send & !Sync); @@ -230,9 +230,7 @@ async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver::recv(_): Send & Sync) async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver>::recv(_): Send & Sync); async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver>::recv(_): !Send & !Sync); -async_assert_fn!(tokio::sync::watch::Receiver::recv(_): Send & Sync); -async_assert_fn!(tokio::sync::watch::Receiver>::recv(_): !Send & !Sync); -async_assert_fn!(tokio::sync::watch::Receiver>::recv(_): !Send & !Sync); +async_assert_fn!(tokio::sync::watch::Receiver::changed(_): Send & Sync); async_assert_fn!(tokio::sync::watch::Sender::closed(_): Send & Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index 5d550443f5b..9dcb0c530fa 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -4,41 +4,41 @@ use tokio::sync::watch; use tokio_test::task::spawn; -use tokio_test::{assert_pending, assert_ready}; +use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok}; #[test] fn single_rx_recv() { let (tx, mut rx) = watch::channel("one"); { - let mut t = spawn(rx.recv()); - let v = assert_ready!(t.poll()); - assert_eq!(v, "one"); + // Not initially notified + let mut t = spawn(rx.changed()); + assert_pending!(t.poll()); } + assert_eq!(*rx.borrow(), "one"); { - let mut t = spawn(rx.recv()); - + let mut t = spawn(rx.changed()); assert_pending!(t.poll()); tx.send("two").unwrap(); assert!(t.is_woken()); - let v = assert_ready!(t.poll()); - assert_eq!(v, "two"); + assert_ready_ok!(t.poll()); } + assert_eq!(*rx.borrow(), "two"); { - let mut t = spawn(rx.recv()); - + let mut t = spawn(rx.changed()); assert_pending!(t.poll()); drop(tx); - let res = assert_ready!(t.poll()); - assert_eq!(res, "two"); + assert!(t.is_woken()); + assert_ready_err!(t.poll()); } + assert_eq!(*rx.borrow(), "two"); } #[test] @@ -47,20 +47,19 @@ fn multi_rx() { let mut rx2 = rx1.clone(); { - let mut t1 = spawn(rx1.recv()); - let mut t2 = spawn(rx2.recv()); - - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); + let mut t1 = spawn(rx1.changed()); + let mut t2 = spawn(rx2.changed()); - let res = assert_ready!(t2.poll()); - assert_eq!(res, "one"); + assert_pending!(t1.poll()); + assert_pending!(t2.poll()); } + assert_eq!(*rx1.borrow(), "one"); + assert_eq!(*rx2.borrow(), "one"); - let mut t2 = spawn(rx2.recv()); + let mut t2 = spawn(rx2.changed()); { - let mut t1 = spawn(rx1.recv()); + let mut t1 = spawn(rx1.changed()); assert_pending!(t1.poll()); assert_pending!(t2.poll()); @@ -70,12 +69,12 @@ fn multi_rx() { assert!(t1.is_woken()); assert!(t2.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "two"); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx1.borrow(), "two"); { - let mut t1 = spawn(rx1.recv()); + let mut t1 = spawn(rx1.changed()); assert_pending!(t1.poll()); @@ -84,45 +83,29 @@ fn multi_rx() { assert!(t1.is_woken()); assert!(t2.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); - - let res = assert_ready!(t2.poll()); - assert_eq!(res, "three"); + assert_ready_ok!(t1.poll()); + assert_ready_ok!(t2.poll()); } + assert_eq!(*rx1.borrow(), "three"); drop(t2); + assert_eq!(*rx2.borrow(), "three"); + { - let mut t1 = spawn(rx1.recv()); - let mut t2 = spawn(rx2.recv()); + let mut t1 = spawn(rx1.changed()); + let mut t2 = spawn(rx2.changed()); assert_pending!(t1.poll()); assert_pending!(t2.poll()); tx.send("four").unwrap(); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "four"); - drop(t1); - - let mut t1 = spawn(rx1.recv()); - assert_pending!(t1.poll()); - - drop(tx); - - assert!(t1.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "four"); - - let res = assert_ready!(t2.poll()); - assert_eq!(res, "four"); - - drop(t2); - let mut t2 = spawn(rx2.recv()); - let res = assert_ready!(t2.poll()); - assert_eq!(res, "four"); + assert_ready_ok!(t1.poll()); + assert_ready_ok!(t2.poll()); } + assert_eq!(*rx1.borrow(), "four"); + assert_eq!(*rx2.borrow(), "four"); } #[test] @@ -133,16 +116,10 @@ fn rx_observes_final_value() { drop(tx); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); - } - - { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); + let mut t1 = spawn(rx.changed()); + assert_ready_err!(t1.poll()); } + assert_eq!(*rx.borrow(), "one"); // Sending a value @@ -151,13 +128,13 @@ fn rx_observes_final_value() { tx.send("two").unwrap(); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "two"); + let mut t1 = spawn(rx.changed()); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx.borrow(), "two"); { - let mut t1 = spawn(rx.recv()); + let mut t1 = spawn(rx.changed()); assert_pending!(t1.poll()); tx.send("three").unwrap(); @@ -165,20 +142,20 @@ fn rx_observes_final_value() { assert!(t1.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx.borrow(), "three"); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); + let mut t1 = spawn(rx.changed()); + assert_ready_err!(t1.poll()); } + assert_eq!(*rx.borrow(), "three"); } #[test] fn poll_close() { - let (mut tx, rx) = watch::channel("one"); + let (tx, rx) = watch::channel("one"); { let mut t = spawn(tx.closed()); @@ -192,40 +169,3 @@ fn poll_close() { assert!(tx.send("two").is_err()); } - -#[test] -fn stream_impl() { - use tokio::stream::StreamExt; - - let (tx, mut rx) = watch::channel("one"); - - { - let mut t = spawn(rx.next()); - let v = assert_ready!(t.poll()).unwrap(); - assert_eq!(v, "one"); - } - - { - let mut t = spawn(rx.next()); - - assert_pending!(t.poll()); - - tx.send("two").unwrap(); - - assert!(t.is_woken()); - - let v = assert_ready!(t.poll()).unwrap(); - assert_eq!(v, "two"); - } - - { - let mut t = spawn(rx.next()); - - assert_pending!(t.poll()); - - drop(tx); - - let res = assert_ready!(t.poll()); - assert_eq!(res, Some("two")); - } -}