Skip to content

Commit

Permalink
sync: watch channel breaking changes (#2806)
Browse files Browse the repository at this point in the history
Fixes: #2172
  • Loading branch information
blasrodri committed Sep 2, 2020
1 parent 8270774 commit 5a1a6dc
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 47 deletions.
4 changes: 2 additions & 2 deletions tokio/src/sync/barrier.rs
Expand Up @@ -96,7 +96,7 @@ impl Barrier {
// wake everyone, increment the generation, and return
state
.waker
.broadcast(state.generation)
.send(state.generation)
.expect("there is at least one receiver");
state.arrived = 0;
state.generation += 1;
Expand All @@ -112,7 +112,7 @@ impl Barrier {
loop {
// note that the first time through the loop, this _will_ yield a generation
// immediately, since we cloned a receiver that has never seen any values.
if wait.recv().await.expect("sender hasn't been closed") >= generation {
if wait.recv().await >= generation {
break;
}
}
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/sync/mod.rs
Expand Up @@ -330,7 +330,7 @@
//! // If the configuration changed, send the new config value
//! // on the watch channel.
//! if new_config != config {
//! tx.broadcast(new_config.clone()).unwrap();
//! tx.send(new_config.clone()).unwrap();
//! config = new_config;
//! }
//! }
Expand Down Expand Up @@ -358,7 +358,7 @@
//! // Receive the **initial** configuration value. As this is the
//! // first time the config is received from the watch, it will
//! // always complete immediatedly.
//! let mut conf = rx.recv().await.unwrap();
//! let mut conf = rx.recv().await;
//!
//! let mut op_start = Instant::now();
//! let mut delay = time::delay_until(op_start + conf.timeout);
Expand All @@ -376,7 +376,7 @@
//! delay = time::delay_until(op_start + conf.timeout);
//! }
//! new_conf = rx.recv() => {
//! conf = new_conf.unwrap();
//! conf = new_conf;
//!
//! // The configuration has been updated. Update the
//! // `delay` using the new `timeout` value.
Expand Down
34 changes: 18 additions & 16 deletions tokio/src/sync/watch.rs
Expand Up @@ -23,12 +23,12 @@
//! let (tx, mut rx) = watch::channel("hello");
//!
//! tokio::spawn(async move {
//! while let Some(value) = rx.recv().await {
//! while let Some(value) = Some(rx.recv().await) {
//! println!("received = {:?}", value);
//! }
//! });
//!
//! tx.broadcast("world")?;
//! tx.send("world")?;
//! # Ok(())
//! # }
//! ```
Expand Down Expand Up @@ -162,12 +162,12 @@ const CLOSED: usize = 1;
/// let (tx, mut rx) = watch::channel("hello");
///
/// tokio::spawn(async move {
/// while let Some(value) = rx.recv().await {
/// while let Some(value) = Some(rx.recv().await) {
/// println!("received = {:?}", value);
/// }
/// });
///
/// tx.broadcast("world")?;
/// tx.send("world")?;
/// # Ok(())
/// # }
/// ```
Expand Down Expand Up @@ -223,7 +223,7 @@ impl<T> Receiver<T> {

// TODO: document
#[doc(hidden)]
pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> {
pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Ref<'a, T>> {
// Make sure the task is up to date
self.inner.waker.register_by_ref(cx.waker());

Expand All @@ -233,12 +233,14 @@ impl<T> Receiver<T> {
if self.inner.version.swap(version, Relaxed) != version {
let inner = self.shared.value.read().unwrap();

return Ready(Some(Ref { inner }));
return Ready(Ref { inner });
}

if CLOSED == state & CLOSED {
// The `Store` handle has been dropped.
return Ready(None);
let inner = self.shared.value.read().unwrap();

return Ready(Ref { inner });
}

Pending
Expand All @@ -264,25 +266,25 @@ impl<T: Clone> Receiver<T> {
/// async fn main() {
/// let (tx, mut rx) = watch::channel("hello");
///
/// let v = rx.recv().await.unwrap();
/// let v = rx.recv().await;
/// assert_eq!(v, "hello");
///
/// tokio::spawn(async move {
/// tx.broadcast("goodbye").unwrap();
/// tx.send("goodbye").unwrap();
/// });
///
/// // Waits for the new task to spawn and send the value.
/// let v = rx.recv().await.unwrap();
/// let v = rx.recv().await;
/// assert_eq!(v, "goodbye");
///
/// let v = rx.recv().await;
/// assert!(v.is_none());
/// assert_eq!(v, "goodbye");
/// }
/// ```
pub async fn recv(&mut self) -> Option<T> {
pub async fn recv(&mut self) -> T {
poll_fn(|cx| {
let v_ref = ready!(self.poll_recv_ref(cx));
Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
Poll::Ready((*v_ref).clone())
})
.await
}
Expand All @@ -295,7 +297,7 @@ impl<T: Clone> crate::stream::Stream for Receiver<T> {
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let v_ref = ready!(self.poll_recv_ref(cx));

Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
Poll::Ready(Some((*v_ref).clone()))
}
}

Expand All @@ -318,8 +320,8 @@ impl<T> Drop for Receiver<T> {
}

impl<T> Sender<T> {
/// Broadcasts a new value via the channel, notifying all receivers.
pub fn broadcast(&self, value: T) -> Result<(), error::SendError<T>> {
/// Sends a new value via the channel, notifying all receivers.
pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
let shared = match self.shared.upgrade() {
Some(shared) => shared,
// All `Watch` handles have been canceled
Expand Down
52 changes: 26 additions & 26 deletions tokio/tests/sync_watch.rs
Expand Up @@ -12,7 +12,7 @@ fn single_rx_recv() {

{
let mut t = spawn(rx.recv());
let v = assert_ready!(t.poll()).unwrap();
let v = assert_ready!(t.poll());
assert_eq!(v, "one");
}

Expand All @@ -21,11 +21,11 @@ fn single_rx_recv() {

assert_pending!(t.poll());

tx.broadcast("two").unwrap();
tx.send("two").unwrap();

assert!(t.is_woken());

let v = assert_ready!(t.poll()).unwrap();
let v = assert_ready!(t.poll());
assert_eq!(v, "two");
}

Expand All @@ -37,7 +37,7 @@ fn single_rx_recv() {
drop(tx);

let res = assert_ready!(t.poll());
assert!(res.is_none());
assert_eq!(res, "two");
}
}

Expand All @@ -51,10 +51,10 @@ fn multi_rx() {
let mut t2 = spawn(rx2.recv());

let res = assert_ready!(t1.poll());
assert_eq!(res.unwrap(), "one");
assert_eq!(res, "one");

let res = assert_ready!(t2.poll());
assert_eq!(res.unwrap(), "one");
assert_eq!(res, "one");
}

let mut t2 = spawn(rx2.recv());
Expand All @@ -65,30 +65,30 @@ fn multi_rx() {
assert_pending!(t1.poll());
assert_pending!(t2.poll());

tx.broadcast("two").unwrap();
tx.send("two").unwrap();

assert!(t1.is_woken());
assert!(t2.is_woken());

let res = assert_ready!(t1.poll());
assert_eq!(res.unwrap(), "two");
assert_eq!(res, "two");
}

{
let mut t1 = spawn(rx1.recv());

assert_pending!(t1.poll());

tx.broadcast("three").unwrap();
tx.send("three").unwrap();

assert!(t1.is_woken());
assert!(t2.is_woken());

let res = assert_ready!(t1.poll());
assert_eq!(res.unwrap(), "three");
assert_eq!(res, "three");

let res = assert_ready!(t2.poll());
assert_eq!(res.unwrap(), "three");
assert_eq!(res, "three");
}

drop(t2);
Expand All @@ -100,10 +100,10 @@ fn multi_rx() {
assert_pending!(t1.poll());
assert_pending!(t2.poll());

tx.broadcast("four").unwrap();
tx.send("four").unwrap();

let res = assert_ready!(t1.poll());
assert_eq!(res.unwrap(), "four");
assert_eq!(res, "four");
drop(t1);

let mut t1 = spawn(rx1.recv());
Expand All @@ -113,15 +113,15 @@ fn multi_rx() {

assert!(t1.is_woken());
let res = assert_ready!(t1.poll());
assert!(res.is_none());
assert_eq!(res, "four");

let res = assert_ready!(t2.poll());
assert_eq!(res.unwrap(), "four");
assert_eq!(res, "four");

drop(t2);
let mut t2 = spawn(rx2.recv());
let res = assert_ready!(t2.poll());
assert!(res.is_none());
assert_eq!(res, "four");
}
}

Expand All @@ -135,44 +135,44 @@ fn rx_observes_final_value() {
{
let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
assert_eq!(res.unwrap(), "one");
assert_eq!(res, "one");
}

{
let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
assert!(res.is_none());
assert_eq!(res, "one");
}

// Sending a value

let (tx, mut rx) = watch::channel("one");

tx.broadcast("two").unwrap();
tx.send("two").unwrap();

{
let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
assert_eq!(res.unwrap(), "two");
assert_eq!(res, "two");
}

{
let mut t1 = spawn(rx.recv());
assert_pending!(t1.poll());

tx.broadcast("three").unwrap();
tx.send("three").unwrap();
drop(tx);

assert!(t1.is_woken());

let res = assert_ready!(t1.poll());
assert_eq!(res.unwrap(), "three");
assert_eq!(res, "three");
}

{
let mut t1 = spawn(rx.recv());
let res = assert_ready!(t1.poll());
assert!(res.is_none());
assert_eq!(res, "three");
}
}

Expand All @@ -190,7 +190,7 @@ fn poll_close() {
assert_ready!(t.poll());
}

assert!(tx.broadcast("two").is_err());
assert!(tx.send("two").is_err());
}

#[test]
Expand All @@ -210,7 +210,7 @@ fn stream_impl() {

assert_pending!(t.poll());

tx.broadcast("two").unwrap();
tx.send("two").unwrap();

assert!(t.is_woken());

Expand All @@ -226,6 +226,6 @@ fn stream_impl() {
drop(tx);

let res = assert_ready!(t.poll());
assert!(res.is_none());
assert_eq!(res, Some("two"));
}
}

0 comments on commit 5a1a6dc

Please sign in to comment.