Skip to content

Commit

Permalink
sync: add mpsc::Sender::max_capacity method (#4904)
Browse files Browse the repository at this point in the history
  • Loading branch information
Daksh14 committed Aug 14, 2022
1 parent d416b1d commit b3c7c98
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
40 changes: 40 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Expand Up @@ -998,6 +998,8 @@ impl<T> Sender<T> {
///
/// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
/// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
/// This is distinct from [`max_capacity`], which always returns buffer capacity initially
/// specified when calling [`channel`]
///
/// # Examples
///
Expand All @@ -1023,6 +1025,8 @@ impl<T> Sender<T> {
///
/// [`send`]: Sender::send
/// [`reserve`]: Sender::reserve
/// [`channel`]: channel
/// [`max_capacity`]: Sender::max_capacity
pub fn capacity(&self) -> usize {
self.chan.semaphore().0.available_permits()
}
Expand All @@ -1036,6 +1040,42 @@ impl<T> Sender<T> {
chan: self.chan.downgrade(),
}
}

/// Returns the maximum buffer capacity of the channel.
///
/// The maximum capacity is the buffer capacity initially specified when calling
/// [`channel`]. This is distinct from [`capacity`], which returns the *current*
/// available buffer capacity: as messages are sent and received, the
/// value returned by [`capacity`] will go up or down, whereas the value
/// returned by `max_capacity` will remain constant.
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, _rx) = mpsc::channel::<()>(5);
///
/// // both max capacity and capacity are the same at first
/// assert_eq!(tx.max_capacity(), 5);
/// assert_eq!(tx.capacity(), 5);
///
/// // Making a reservation doesn't change the max capacity.
/// let permit = tx.reserve().await.unwrap();
/// assert_eq!(tx.max_capacity(), 5);
/// // but drops the capacity by one
/// assert_eq!(tx.capacity(), 4);
/// }
/// ```
///
/// [`channel`]: channel
/// [`max_capacity`]: Sender::max_capacity
/// [`capacity`]: Sender::capacity
pub fn max_capacity(&self) -> usize {
self.chan.semaphore().1
}
}

impl<T> Clone for Sender<T> {
Expand Down
19 changes: 19 additions & 0 deletions tokio/tests/sync_mpsc.rs
Expand Up @@ -921,3 +921,22 @@ async fn test_tx_count_weak_sender() {

assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none());
}

// Tests that channel `capacity` changes and `max_capacity` stays the same
#[tokio::test]
async fn test_tx_capacity() {
let (tx, _rx) = mpsc::channel::<()>(10);
// both capacities are same before
assert_eq!(tx.capacity(), 10);
assert_eq!(tx.max_capacity(), 10);

let _permit = tx.reserve().await.unwrap();
// after reserve, only capacity should drop by one
assert_eq!(tx.capacity(), 9);
assert_eq!(tx.max_capacity(), 10);

let _sent = tx.send(()).await.unwrap();
// after send, capacity should drop by one again
assert_eq!(tx.capacity(), 8);
assert_eq!(tx.max_capacity(), 10);
}

0 comments on commit b3c7c98

Please sign in to comment.