From a7833e300780546c091e606bb2e077542f2e2e4e Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Sun, 13 Dec 2020 21:39:51 -0800 Subject: [PATCH] sync: remove `try_recv()` from mpsc types (#3263) The mpsc `try_recv()` functions have an issue where a sent message happens-before a call to `try_recv()` but `try_recv()` returns `None`. Fixing this is non-trivial, so the function is removed for 1.0. When the bug is fixed, the function can be added back. Closes #2020 --- tokio/src/sync/mpsc/bounded.rs | 9 ++++-- tokio/src/sync/mpsc/chan.rs | 35 +++++++++++++--------- tokio/src/sync/mpsc/error.rs | 50 +++++++++++++++++--------------- tokio/src/sync/mpsc/unbounded.rs | 17 +---------- tokio/tests/sync_mpsc.rs | 40 +------------------------ 5 files changed, 57 insertions(+), 94 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 06b371731cd..c89efdb0580 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1,6 +1,9 @@ use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError}; use crate::sync::mpsc::chan; -use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError}; +#[cfg(unix)] +#[cfg(any(feature = "signal", feature = "process"))] +use crate::sync::mpsc::error::TryRecvError; +use crate::sync::mpsc::error::{SendError, TrySendError}; cfg_time! { use crate::sync::mpsc::error::SendTimeoutError; @@ -194,7 +197,9 @@ impl Receiver { /// /// Compared with recv, this function has two failure cases instead of /// one (one for disconnection, one for an empty buffer). - pub fn try_recv(&mut self) -> Result { + #[cfg(unix)] + #[cfg(any(feature = "signal", feature = "process"))] + pub(crate) fn try_recv(&mut self) -> Result { self.chan.try_recv() } diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index a40f5c3dcce..f34eb0f2127 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -2,7 +2,6 @@ use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; -use crate::sync::mpsc::error::TryRecvError; use crate::sync::mpsc::list; use crate::sync::notify::Notify; @@ -259,21 +258,29 @@ impl Rx { } }) } +} - /// Receives the next value without blocking - pub(crate) fn try_recv(&mut self) -> Result { - use super::block::Read::*; - self.inner.rx_fields.with_mut(|rx_fields_ptr| { - let rx_fields = unsafe { &mut *rx_fields_ptr }; - match rx_fields.list.pop(&self.inner.tx) { - Some(Value(value)) => { - self.inner.semaphore.add_permit(); - Ok(value) +feature! { + #![all(unix, any(feature = "signal", feature = "process"))] + + use crate::sync::mpsc::error::TryRecvError; + + impl Rx { + /// Receives the next value without blocking + pub(crate) fn try_recv(&mut self) -> Result { + use super::block::Read::*; + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + match rx_fields.list.pop(&self.inner.tx) { + Some(Value(value)) => { + self.inner.semaphore.add_permit(); + Ok(value) + } + Some(Closed) => Err(TryRecvError::Closed), + None => Err(TryRecvError::Empty), } - Some(Closed) => Err(TryRecvError::Closed), - None => Err(TryRecvError::Empty), - } - }) + }) + } } } diff --git a/tokio/src/sync/mpsc/error.rs b/tokio/src/sync/mpsc/error.rs index 77054529c69..d23255b5aab 100644 --- a/tokio/src/sync/mpsc/error.rs +++ b/tokio/src/sync/mpsc/error.rs @@ -67,32 +67,36 @@ impl Error for RecvError {} // ===== TryRecvError ===== -/// This enumeration is the list of the possible reasons that try_recv -/// could not return data when called. -#[derive(Debug, PartialEq)] -pub enum TryRecvError { - /// This channel is currently empty, but the Sender(s) have not yet - /// disconnected, so data may yet become available. - Empty, - /// The channel's sending half has been closed, and there will - /// never be any more data received on it. - Closed, -} +feature! { + #![all(unix, any(feature = "signal", feature = "process"))] + + /// This enumeration is the list of the possible reasons that try_recv + /// could not return data when called. + #[derive(Debug, PartialEq)] + pub(crate) enum TryRecvError { + /// This channel is currently empty, but the Sender(s) have not yet + /// disconnected, so data may yet become available. + Empty, + /// The channel's sending half has been closed, and there will + /// never be any more data received on it. + Closed, + } -impl fmt::Display for TryRecvError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - fmt, - "{}", - match self { - TryRecvError::Empty => "channel empty", - TryRecvError::Closed => "channel closed", - } - ) + impl fmt::Display for TryRecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + fmt, + "{}", + match self { + TryRecvError::Empty => "channel empty", + TryRecvError::Closed => "channel closed", + } + ) + } } -} -impl Error for TryRecvError {} + impl Error for TryRecvError {} +} cfg_time! { // ===== SendTimeoutError ===== diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 48fbca9679d..a88abf95608 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -1,6 +1,6 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::sync::mpsc::chan; -use crate::sync::mpsc::error::{SendError, TryRecvError}; +use crate::sync::mpsc::error::SendError; use std::fmt; use std::task::{Context, Poll}; @@ -152,21 +152,6 @@ impl UnboundedReceiver { crate::future::block_on(self.recv()) } - /// Attempts to return a pending value on this receiver without blocking. - /// - /// This method will never block the caller in order to wait for data to - /// become available. Instead, this will always return immediately with - /// a possible option of pending data on the channel. - /// - /// This is useful for a flavor of "optimistic check" before deciding to - /// block on a receiver. - /// - /// Compared with recv, this function has two failure cases instead of - /// one (one for disconnection, one for an empty buffer). - pub fn try_recv(&mut self) -> Result { - self.chan.try_recv() - } - /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index ddbbf9ea0a0..e64faca2a9c 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -5,7 +5,7 @@ use std::thread; use tokio::runtime::Runtime; use tokio::sync::mpsc; -use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; +use tokio::sync::mpsc::error::TrySendError; use tokio_test::task; use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, @@ -385,44 +385,6 @@ fn unconsumed_messages_are_dropped() { assert_eq!(1, Arc::strong_count(&msg)); } -#[test] -fn try_recv() { - let (tx, mut rx) = mpsc::channel(1); - match rx.try_recv() { - Err(TryRecvError::Empty) => {} - _ => panic!(), - } - tx.try_send(42).unwrap(); - match rx.try_recv() { - Ok(42) => {} - _ => panic!(), - } - drop(tx); - match rx.try_recv() { - Err(TryRecvError::Closed) => {} - _ => panic!(), - } -} - -#[test] -fn try_recv_unbounded() { - let (tx, mut rx) = mpsc::unbounded_channel(); - match rx.try_recv() { - Err(TryRecvError::Empty) => {} - _ => panic!(), - } - tx.send(42).unwrap(); - match rx.try_recv() { - Ok(42) => {} - _ => panic!(), - } - drop(tx); - match rx.try_recv() { - Err(TryRecvError::Closed) => {} - _ => panic!(), - } -} - #[test] fn blocking_recv() { let (tx, mut rx) = mpsc::channel::(1);