Skip to content

Commit

Permalink
sync: implement Weak version of mpsc::Sender (#4595)
Browse files Browse the repository at this point in the history
  • Loading branch information
b-naber committed Jul 27, 2022
1 parent b2ada60 commit 1d75b57
Show file tree
Hide file tree
Showing 4 changed files with 361 additions and 7 deletions.
68 changes: 68 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
@@ -1,3 +1,4 @@
use crate::loom::sync::Arc;
use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
Expand All @@ -22,6 +23,40 @@ pub struct Sender<T> {
chan: chan::Tx<T, Semaphore>,
}

/// A sender that does not prevent the channel from being closed.
///
/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
/// instances remain, the channel is closed.
///
/// In order to send messages, the `WeakSender` needs to be upgraded using
/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
///
/// [`Sender`]: Sender
/// [`WeakSender::upgrade`]: WeakSender::upgrade
///
/// #Examples
///
/// ```
/// use tokio::sync::mpsc::channel;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, _rx) = channel::<i32>(15);
/// let tx_weak = tx.downgrade();
///
/// // Upgrading will succeed because `tx` still exists.
/// assert!(tx_weak.upgrade().is_some());
///
/// // If we drop `tx`, then it will fail.
/// drop(tx);
/// assert!(tx_weak.clone().upgrade().is_none());
/// }
/// ```
pub struct WeakSender<T> {
chan: Arc<chan::Chan<T, Semaphore>>,
}

/// Permits to send one value into the channel.
///
/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
Expand Down Expand Up @@ -991,6 +1026,16 @@ impl<T> Sender<T> {
pub fn capacity(&self) -> usize {
self.chan.semaphore().0.available_permits()
}

/// Converts the `Sender` to a [`WeakSender`] that does not count
/// towards RAII semantics, i.e. if all `Sender` instances of the
/// channel were dropped and only `WeakSender` instances remain,
/// the channel is closed.
pub fn downgrade(&self) -> WeakSender<T> {
WeakSender {
chan: self.chan.downgrade(),
}
}
}

impl<T> Clone for Sender<T> {
Expand All @@ -1009,6 +1054,29 @@ impl<T> fmt::Debug for Sender<T> {
}
}

impl<T> Clone for WeakSender<T> {
fn clone(&self) -> Self {
WeakSender {
chan: self.chan.clone(),
}
}
}

impl<T> WeakSender<T> {
/// Tries to convert a WeakSender into a [`Sender`]. This will return `Some`
/// if there are other `Sender` instances alive and the channel wasn't
/// previously dropped, otherwise `None` is returned.
pub fn upgrade(&self) -> Option<Sender<T>> {
chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
}
}

impl<T> fmt::Debug for WeakSender<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("WeakSender").finish()
}
}

// ===== impl Permit =====

impl<T> Permit<'_, T> {
Expand Down
32 changes: 27 additions & 5 deletions tokio/src/sync/mpsc/chan.rs
Expand Up @@ -10,9 +10,10 @@ use crate::sync::notify::Notify;

use std::fmt;
use std::process;
use std::sync::atomic::Ordering::{AcqRel, Relaxed};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
use std::usize;

/// Channel sender.
pub(crate) struct Tx<T, S> {
Expand Down Expand Up @@ -46,7 +47,7 @@ pub(crate) trait Semaphore {
fn is_closed(&self) -> bool;
}

struct Chan<T, S> {
pub(super) struct Chan<T, S> {
/// Notifies all tasks listening for the receiver being dropped.
notify_rx_closed: Notify,

Expand Down Expand Up @@ -129,6 +130,30 @@ impl<T, S> Tx<T, S> {
Tx { inner: chan }
}

pub(super) fn downgrade(&self) -> Arc<Chan<T, S>> {
self.inner.clone()
}

// Returns the upgraded channel or None if the upgrade failed.
pub(super) fn upgrade(chan: Arc<Chan<T, S>>) -> Option<Self> {
let mut tx_count = chan.tx_count.load(Acquire);

loop {
if tx_count == 0 {
// channel is closed
return None;
}

match chan
.tx_count
.compare_exchange_weak(tx_count, tx_count + 1, AcqRel, Acquire)
{
Ok(_) => return Some(Tx { inner: chan }),
Err(prev_count) => tx_count = prev_count,
}
}
}

pub(super) fn semaphore(&self) -> &S {
&self.inner.semaphore
}
Expand Down Expand Up @@ -378,9 +403,6 @@ impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) {

// ===== impl Semaphore for AtomicUsize =====

use std::sync::atomic::Ordering::{Acquire, Release};
use std::usize;

impl Semaphore for AtomicUsize {
fn add_permit(&self) {
let prev = self.fetch_sub(2, Release);
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/mpsc/mod.rs
Expand Up @@ -90,7 +90,7 @@
pub(super) mod block;

mod bounded;
pub use self::bounded::{channel, OwnedPermit, Permit, Receiver, Sender};
pub use self::bounded::{channel, OwnedPermit, Permit, Receiver, Sender, WeakSender};

mod chan;

Expand Down

0 comments on commit 1d75b57

Please sign in to comment.