Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch channel breaking changes #2806

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions tokio/src/sync/barrier.rs
Expand Up @@ -96,7 +96,7 @@ impl Barrier {
// wake everyone, increment the generation, and return
state
.waker
.broadcast(state.generation)
.send(state.generation)
.expect("there is at least one receiver");
state.arrived = 0;
state.generation += 1;
Expand All @@ -112,7 +112,7 @@ impl Barrier {
loop {
// note that the first time through the loop, this _will_ yield a generation
// immediately, since we cloned a receiver that has never seen any values.
if wait.recv().await.expect("sender hasn't been closed") >= generation {
if wait.recv().await >= generation {
break;
}
}
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/sync/mod.rs
Expand Up @@ -330,7 +330,7 @@
//! // If the configuration changed, send the new config value
//! // on the watch channel.
//! if new_config != config {
//! tx.broadcast(new_config.clone()).unwrap();
//! tx.send(new_config.clone()).unwrap();
//! config = new_config;
//! }
//! }
Expand Down Expand Up @@ -358,7 +358,7 @@
//! // Receive the **initial** configuration value. As this is the
//! // first time the config is received from the watch, it will
//! // always complete immediatedly.
//! let mut conf = rx.recv().await.unwrap();
//! let mut conf = rx.recv().await;
//!
//! let mut op_start = Instant::now();
//! let mut delay = time::delay_until(op_start + conf.timeout);
Expand All @@ -376,7 +376,7 @@
//! delay = time::delay_until(op_start + conf.timeout);
//! }
//! new_conf = rx.recv() => {
//! conf = new_conf.unwrap();
//! conf = new_conf;
//!
//! // The configuration has been updated. Update the
//! // `delay` using the new `timeout` value.
Expand Down
34 changes: 18 additions & 16 deletions tokio/src/sync/watch.rs
Expand Up @@ -23,12 +23,12 @@
//! let (tx, mut rx) = watch::channel("hello");
//!
//! tokio::spawn(async move {
//! while let Some(value) = rx.recv().await {
//! while let Some(value) = Some(rx.recv().await) {
//! println!("received = {:?}", value);
//! }
//! });
//!
//! tx.broadcast("world")?;
//! tx.send("world")?;
//! # Ok(())
//! # }
//! ```
Expand Down Expand Up @@ -162,12 +162,12 @@ const CLOSED: usize = 1;
/// let (tx, mut rx) = watch::channel("hello");
///
/// tokio::spawn(async move {
/// while let Some(value) = rx.recv().await {
/// while let Some(value) = Some(rx.recv().await) {
/// println!("received = {:?}", value);
/// }
/// });
///
/// tx.broadcast("world")?;
/// tx.send("world")?;
/// # Ok(())
/// # }
/// ```
Expand Down Expand Up @@ -223,7 +223,7 @@ impl<T> Receiver<T> {

// TODO: document
#[doc(hidden)]
pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> {
pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Ref<'a, T>> {
// Make sure the task is up to date
self.inner.waker.register_by_ref(cx.waker());

Expand All @@ -233,12 +233,14 @@ impl<T> Receiver<T> {
if self.inner.version.swap(version, Relaxed) != version {
let inner = self.shared.value.read().unwrap();

return Ready(Some(Ref { inner }));
return Ready(Ref { inner });
}

if CLOSED == state & CLOSED {
// The `Store` handle has been dropped.
return Ready(None);
let inner = self.shared.value.read().unwrap();

return Ready(Ref { inner });
}

Pending
Expand All @@ -264,25 +266,25 @@ impl<T: Clone> Receiver<T> {
/// async fn main() {
/// let (tx, mut rx) = watch::channel("hello");
///
/// let v = rx.recv().await.unwrap();
/// let v = rx.recv().await;
/// assert_eq!(v, "hello");
///
/// tokio::spawn(async move {
/// tx.broadcast("goodbye").unwrap();
/// tx.send("goodbye").unwrap();
/// });
///
/// // Waits for the new task to spawn and send the value.
/// let v = rx.recv().await.unwrap();
/// let v = rx.recv().await;
/// assert_eq!(v, "goodbye");
///
/// let v = rx.recv().await;
/// assert!(v.is_none());
/// assert_eq!(v, "goodbye");
/// }
/// ```
pub async fn recv(&mut self) -> Option<T> {
pub async fn recv(&mut self) -> T {
poll_fn(|cx| {
let v_ref = ready!(self.poll_recv_ref(cx));
Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
Poll::Ready((*v_ref).clone())
})
.await
}
Expand All @@ -295,7 +297,7 @@ impl<T: Clone> crate::stream::Stream for Receiver<T> {
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let v_ref = ready!(self.poll_recv_ref(cx));

Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
Poll::Ready(Some((*v_ref).clone()))
}
}

Expand All @@ -318,8 +320,8 @@ impl<T> Drop for Receiver<T> {
}

impl<T> Sender<T> {
/// Broadcasts a new value via the channel, notifying all receivers.
pub fn broadcast(&self, value: T) -> Result<(), error::SendError<T>> {
/// Sends a new value via the channel, notifying all receivers.
pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
let shared = match self.shared.upgrade() {
Some(shared) => shared,
// All `Watch` handles have been canceled
Expand Down
52 changes: 26 additions & 26 deletions tokio/tests/sync_watch.rs
Expand Up @@ -12,7 +12,7 @@ fn single_rx_recv() {

{
let mut t = spawn(rx.recv());
let v = assert_ready!(t.poll()).unwrap();
let v = assert_ready!(t.poll());
assert_eq!(v, "one");
}

Expand All @@ -21,11 +21,11 @@ fn single_rx_recv() {

assert_pending!(t.poll());

tx.broadcast("two").unwrap();
tx.send("two").unwrap();

assert!(t.is_woken());

let v = assert_ready!(t.poll()).unwrap();
let v = assert_ready!(t.poll());
assert_eq!(v, "two");
}

Expand All @@ -37,7 +37,7 @@ fn single_rx_recv() {
drop(tx);

let res = assert_ready!(t.poll());
assert!(res.is_none());
assert_eq!(res, "two");
}
}

Expand All @@ -51,10 +51,10 @@ fn multi_rx() {
let mut t2 = spawn(rx2.recv());

let res = assert_ready!(t1.poll());
assert_eq!(res.unwrap(), "one");
assert_eq!(res, "one");

let res = assert_ready!(t2.poll());
assert_eq!(res.unwrap(), "one");
assert_eq!(res, "one");
}

let mut t2 = spawn(rx2.recv());
Expand All @@ -65,30 +65,30 @@ fn multi_rx() {
assert_pending!(t1.poll());
assert_pending!(t2.poll());

tx.broadcast("two").unwrap();
tx.send("two").unwrap();

assert!(t1.is_woken());
assert!(t2.is_woken());

let res = assert_ready!(t1.poll());
assert_eq!(res.unwrap(), "two");
assert_eq!(res, "two");
}

{
let mut t1 = spawn(rx1.recv());

assert_pending!(t1.poll());

tx.broadcast("three").unwrap();
tx.send("three").unwrap();

assert!(t1.is_woken());
assert!(t2.is_woken());

let res = assert_ready!(t1.poll());
assert_eq!(res.unwrap(), "three");
assert_eq!(res, "three");

let res = assert_ready!(t2.poll());
assert_eq!(res.unwrap(), "three");
assert_eq!(res, "three");
}

drop(t2);
Expand All @@ -100,10 +100,10 @@ fn multi_rx() {
assert_pending!(t1.poll());
assert_pending!(t2.poll());

tx.broadcast("four").unwrap();
tx.send("four").unwrap();

let res = assert_ready!(t1.poll());
assert_eq!(res.unwrap(), "four");
assert_eq!(res, "four");
drop(t1);

let mut t1 = spawn(rx1.recv());
Expand All @@ -113,15 +113,15 @@ fn multi_rx() {

assert!(t1.is_woken());
let res = assert_ready!(t1.poll());
assert!(res.is_none());
assert_eq!(res, "four");

let res = assert_ready!(t2.poll());
assert_eq!(res.unwrap(), "four");
assert_eq!(res, "four");

drop(t2);
let mut t2 = spawn(rx2.recv());
let res = assert_ready!(t2.poll());
assert!(res.is_none());
assert_eq!(res, "four");
}
}

Expand All @@ -135,44 +135,44 @@ fn rx_observes_final_value() {
{
let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
assert_eq!(res.unwrap(), "one");
assert_eq!(res, "one");
}

{
let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
assert!(res.is_none());
assert_eq!(res, "one");
}

// Sending a value

let (tx, mut rx) = watch::channel("one");

tx.broadcast("two").unwrap();
tx.send("two").unwrap();

{
let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
assert_eq!(res.unwrap(), "two");
assert_eq!(res, "two");
}

{
let mut t1 = spawn(rx.recv());
assert_pending!(t1.poll());

tx.broadcast("three").unwrap();
tx.send("three").unwrap();
drop(tx);

assert!(t1.is_woken());

let res = assert_ready!(t1.poll());
assert_eq!(res.unwrap(), "three");
assert_eq!(res, "three");
}

{
let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
assert!(res.is_none());
assert_eq!(res, "three");
}
}

Expand All @@ -190,7 +190,7 @@ fn poll_close() {
assert_ready!(t.poll());
}

assert!(tx.broadcast("two").is_err());
assert!(tx.send("two").is_err());
}

#[test]
Expand All @@ -210,7 +210,7 @@ fn stream_impl() {

assert_pending!(t.poll());

tx.broadcast("two").unwrap();
tx.send("two").unwrap();

assert!(t.is_woken());

Expand All @@ -226,6 +226,6 @@ fn stream_impl() {
drop(tx);

let res = assert_ready!(t.poll());
assert!(res.is_none());
assert_eq!(res, Some("two"));
}
}