Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: Add is_closed method to mpsc senders #2726

Merged
merged 15 commits into from Sep 28, 2020
15 changes: 15 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Expand Up @@ -523,6 +523,21 @@ impl<T> Sender<T> {
enter_handle.block_on(self.send(value)).unwrap()
}

/// Checks if `Receiver` is still alive.
MikailBag marked this conversation as resolved.
Show resolved Hide resolved
///
/// ```
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
/// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
/// assert!(!tx.is_closed());
/// let tx2 = tx.clone();
/// assert!(!tx2.is_closed());
/// std::mem::drop(rx);
MikailBag marked this conversation as resolved.
Show resolved Hide resolved
/// assert!(tx.is_closed());
/// assert!(tx2.is_closed());
MikailBag marked this conversation as resolved.
Show resolved Hide resolved
/// ```
pub fn is_closed(&self) -> bool {
self.chan.is_closed()
}

/// Wait for channel capacity. Once capacity to send one message is
/// available, it is reserved for the caller.
///
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/sync/mpsc/chan.rs
Expand Up @@ -143,6 +143,10 @@ impl<T, S> Tx<T, S> {
}

impl<T, S: Semaphore> Tx<T, S> {
pub(crate) fn is_closed(&self) -> bool {
self.inner.semaphore.is_closed()
}

pub(crate) async fn closed(&mut self) {
use std::future::Future;
use std::pin::Pin;
Expand Down
14 changes: 14 additions & 0 deletions tokio/src/sync/mpsc/unbounded.rs
Expand Up @@ -245,4 +245,18 @@ impl<T> UnboundedSender<T> {
pub async fn closed(&mut self) {
self.chan.closed().await
}
/// Checks if `UnboundedReceiver` is still alive.
///
/// ```
/// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
/// assert!(!tx.is_closed());
/// let tx2 = tx.clone();
/// assert!(!tx2.is_closed());
/// std::mem::drop(rx);
MikailBag marked this conversation as resolved.
Show resolved Hide resolved
/// assert!(tx.is_closed());
/// assert!(tx2.is_closed());
/// ```
pub fn is_closed(&self) -> bool {
self.chan.is_closed()
}
}