Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: support mpsc send with &self #2861

Merged
merged 14 commits into from Sep 25, 2020
4 changes: 3 additions & 1 deletion tokio-test/src/io.rs
Expand Up @@ -201,7 +201,9 @@ impl Inner {
}

fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
self.rx.poll_recv(cx)
use futures_core::stream::Stream;

Pin::new(&mut self.rx).poll_next(cx)
}

fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> {
Expand Down
30 changes: 1 addition & 29 deletions tokio/src/signal/unix.rs
Expand Up @@ -391,35 +391,7 @@ impl Signal {
poll_fn(|cx| self.poll_recv(cx)).await
}

/// Polls to receive the next signal notification event, outside of an
/// `async` context.
///
/// `None` is returned if no more events can be received by this stream.
///
/// # Examples
///
/// Polling from a manually implemented future
///
/// ```rust,no_run
/// use std::pin::Pin;
/// use std::future::Future;
/// use std::task::{Context, Poll};
/// use tokio::signal::unix::Signal;
///
/// struct MyFuture {
/// signal: Signal,
/// }
///
/// impl Future for MyFuture {
/// type Output = Option<()>;
///
/// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// println!("polling MyFuture");
/// self.signal.poll_recv(cx)
/// }
/// }
/// ```
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
self.rx.poll_recv(cx)
}
}
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/stream/mod.rs
Expand Up @@ -269,8 +269,8 @@ pub trait StreamExt: Stream {
/// # #[tokio::main(basic_scheduler)]
/// async fn main() {
/// # time::pause();
/// let (mut tx1, rx1) = mpsc::channel(10);
/// let (mut tx2, rx2) = mpsc::channel(10);
/// let (tx1, rx1) = mpsc::channel(10);
/// let (tx2, rx2) = mpsc::channel(10);
///
/// let mut rx = rx1.merge(rx2);
///
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/stream/stream_map.rs
Expand Up @@ -57,8 +57,8 @@ use std::task::{Context, Poll};
///
/// #[tokio::main]
/// async fn main() {
/// let (mut tx1, rx1) = mpsc::channel(10);
/// let (mut tx2, rx2) = mpsc::channel(10);
/// let (tx1, rx1) = mpsc::channel(10);
/// let (tx2, rx2) = mpsc::channel(10);
///
/// tokio::spawn(async move {
/// tx1.send(1).await.unwrap();
Expand Down
8 changes: 6 additions & 2 deletions tokio/src/sync/batch_semaphore.rs
Expand Up @@ -165,7 +165,6 @@ impl Semaphore {
/// permits and notifies all pending waiters.
// This will be used once the bounded MPSC is updated to use the new
// semaphore implementation.
#[allow(dead_code)]
pub(crate) fn close(&self) {
let mut waiters = self.waiters.lock().unwrap();
// If the semaphore's permits counter has enough permits for an
Expand All @@ -185,6 +184,11 @@ impl Semaphore {
}
}

/// Returns true if the semaphore is closed
pub(crate) fn is_closed(&self) -> bool {
self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
carllerche marked this conversation as resolved.
Show resolved Hide resolved
}

pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
assert!(
num_permits as usize <= Self::MAX_PERMITS,
Expand All @@ -194,7 +198,7 @@ impl Semaphore {
let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
let mut curr = self.permits.load(Acquire);
loop {
// Has the semaphore closed?git
// Has the semaphore closed?
if curr & Self::CLOSED > 0 {
return Err(TryAcquireError::Closed);
}
Expand Down
9 changes: 4 additions & 5 deletions tokio/src/sync/mod.rs
Expand Up @@ -106,7 +106,7 @@
//!
//! #[tokio::main]
//! async fn main() {
//! let (mut tx, mut rx) = mpsc::channel(100);
//! let (tx, mut rx) = mpsc::channel(100);
//!
//! tokio::spawn(async move {
//! for i in 0..10 {
Expand Down Expand Up @@ -150,7 +150,7 @@
//! for _ in 0..10 {
//! // Each task needs its own `tx` handle. This is done by cloning the
//! // original handle.
//! let mut tx = tx.clone();
//! let tx = tx.clone();
//!
//! tokio::spawn(async move {
//! tx.send(&b"data to write"[..]).await.unwrap();
Expand Down Expand Up @@ -213,7 +213,7 @@
//!
//! // Spawn tasks that will send the increment command.
//! for _ in 0..10 {
//! let mut cmd_tx = cmd_tx.clone();
//! let cmd_tx = cmd_tx.clone();
//!
//! join_handles.push(tokio::spawn(async move {
//! let (resp_tx, resp_rx) = oneshot::channel();
Expand Down Expand Up @@ -443,7 +443,6 @@ cfg_sync! {
pub mod oneshot;

pub(crate) mod batch_semaphore;
pub(crate) mod semaphore_ll;
mod semaphore;
pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};

Expand All @@ -470,7 +469,7 @@ cfg_not_sync! {

cfg_signal! {
pub(crate) mod mpsc;
pub(crate) mod semaphore_ll;
pub(crate) mod batch_semaphore;
}
}

Expand Down