diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 540d8d8e264..af4f67a7fb9 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -8,7 +8,7 @@ use crate::sync::Notify; use std::fmt; use std::process; -use std::sync::atomic::Ordering::{AcqRel, Relaxed, SeqCst}; +use std::sync::atomic::Ordering::{AcqRel, Relaxed}; use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; @@ -223,19 +223,14 @@ where let notified = self.inner.notify_rx_closed.notified(); pin!(notified); - // Polling this for first time will register the waiter and - // return `Pending` or return `Ready` right away. If `Ready` - // is returned we are done - let aquired_lost_notification = - crate::future::poll_fn(|cx| match Pin::new(&mut notified).poll(cx) { - Poll::Ready(()) => Poll::Ready(true), - Poll::Pending => Poll::Ready(false), - }) - .await; - - if aquired_lost_notification { - return; - } + // Polling the future once is guaranteed to return `Pending` as `watch` + // only notifies using `notify_waiters`. + crate::future::poll_fn(|cx| { + let res = Pin::new(&mut notified).poll(cx); + assert!(!res.is_ready()); + Poll::Ready(()) + }) + .await; if self.inner.semaphore.closed() { return;