Skip to content

Commit

Permalink
sync: Add is_closed method to mpsc senders (#2726)
Browse files Browse the repository at this point in the history
Co-authored-by: Alice Ryhl <alice@ryhl.io>
  • Loading branch information
MikailBag and Darksonn committed Sep 28, 2020
1 parent 99d4061 commit 078d0a2
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 0 deletions.
22 changes: 22 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Expand Up @@ -523,6 +523,28 @@ impl<T> Sender<T> {
enter_handle.block_on(self.send(value)).unwrap()
}

/// Checks if the channel has been closed. This happens when the
/// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
/// called.
///
/// [`Receiver`]: crate::sync::mpsc::Receiver
/// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
///
/// ```
/// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
/// assert!(!tx.is_closed());
///
/// let tx2 = tx.clone();
/// assert!(!tx2.is_closed());
///
/// drop(rx);
/// assert!(tx.is_closed());
/// assert!(tx2.is_closed());
/// ```
pub fn is_closed(&self) -> bool {
self.chan.is_closed()
}

/// Wait for channel capacity. Once capacity to send one message is
/// available, it is reserved for the caller.
///
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/sync/mpsc/chan.rs
Expand Up @@ -143,6 +143,10 @@ impl<T, S> Tx<T, S> {
}

impl<T, S: Semaphore> Tx<T, S> {
pub(crate) fn is_closed(&self) -> bool {
self.inner.semaphore.is_closed()
}

pub(crate) async fn closed(&mut self) {
use std::future::Future;
use std::pin::Pin;
Expand Down
21 changes: 21 additions & 0 deletions tokio/src/sync/mpsc/unbounded.rs
Expand Up @@ -245,4 +245,25 @@ impl<T> UnboundedSender<T> {
pub async fn closed(&mut self) {
self.chan.closed().await
}
/// Checks if the channel has been closed. This happens when the
/// [`UnboundedReceiver`] is dropped, or when the
/// [`UnboundedReceiver::close`] method is called.
///
/// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver
/// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
///
/// ```
/// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
/// assert!(!tx.is_closed());
///
/// let tx2 = tx.clone();
/// assert!(!tx2.is_closed());
///
/// drop(rx);
/// assert!(tx.is_closed());
/// assert!(tx2.is_closed());
/// ```
pub fn is_closed(&self) -> bool {
self.chan.is_closed()
}
}

0 comments on commit 078d0a2

Please sign in to comment.