From fe51bd7a97838d248802afd417d77da4c3044e54 Mon Sep 17 00:00:00 2001 From: b-naber Date: Tue, 10 May 2022 16:05:35 +0200 Subject: [PATCH] use Arc in WeakSender --- tokio-util/tests/mpsc.rs | 5 +- tokio/src/sync/mpsc/bounded.rs | 21 +++++-- tokio/src/sync/mpsc/chan.rs | 104 +++++++++++++-------------------- 3 files changed, 59 insertions(+), 71 deletions(-) diff --git a/tokio-util/tests/mpsc.rs b/tokio-util/tests/mpsc.rs index a0e21066bd7..872964a41f7 100644 --- a/tokio-util/tests/mpsc.rs +++ b/tokio-util/tests/mpsc.rs @@ -331,7 +331,10 @@ async fn actor_weak_sender() { async fn send_message_to_self(&mut self) { let msg = ActorMessage::SelfMessage {}; - if let Some(sender) = self.sender.upgrade() { + let sender = self.sender.clone(); + + // cannot move self.sender here + if let Some(sender) = sender.upgrade() { let _ = sender.send(msg).await; self.sender = sender.downgrade(); } diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index b22228a767f..92959ac3a2a 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -63,7 +63,7 @@ pub struct Sender { /// /// ``` pub struct WeakSender { - chan: chan::TxWeak, + chan: chan::Tx, } /// Permits to send one value into the channel. @@ -1044,8 +1044,9 @@ impl Sender { // Note: If this is the last `Sender` instance we want to close the // channel when downgrading, so it's important to move into `self` here. - let chan = self.chan.downgrade(); - WeakSender { chan } + WeakSender { + chan: self.chan.downgrade(), + } } } @@ -1065,12 +1066,20 @@ impl fmt::Debug for Sender { } } +impl Clone for WeakSender { + fn clone(&self) -> Self { + WeakSender { + chan: self.chan.clone(), + } + } +} + impl WeakSender { - /// Tries to conver a WeakSender into a [`Sender`]. This will return `Some` + /// Tries to convert a WeakSender into a [`Sender`]. This will return `Some` /// if there are other `Sender` instances alive and the channel wasn't /// previously dropped, otherwise `None` is returned. - pub fn upgrade(&self) -> Option> { - self.chan.upgrade().map(Sender::new) + pub fn upgrade(self) -> Option> { + self.chan.upgrade().map(|tx| Sender::new(tx)) } } diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index d2394ced899..d097ce1e4a7 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -9,10 +9,8 @@ use crate::sync::mpsc::list; use crate::sync::notify::Notify; use std::fmt; -use std::mem; use std::process; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; -use std::sync::Weak; use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; use std::usize; @@ -28,16 +26,6 @@ impl fmt::Debug for Tx { } } -pub(crate) struct TxWeak { - inner: Weak>, -} - -impl fmt::Debug for TxWeak { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("TxWeak").finish() - } -} - /// Channel receiver. pub(crate) struct Rx { inner: Arc>, @@ -142,12 +130,47 @@ impl Tx { Tx { inner: chan } } - pub(super) fn downgrade(self) -> TxWeak { - // We don't decrement the `tx_counter` here, but let the counter be decremented - // through the drop of self.inner. - let weak_inner = Arc::>::downgrade(&self.inner); + pub(super) fn downgrade(self) -> Self { + if self.inner.tx_count.fetch_sub(1, AcqRel) == 1 { + // Close the list, which sends a `Close` message + self.inner.tx.close(); + + // Notify the receiver + self.wake_rx(); + } + + self + } - TxWeak::new(weak_inner) + // Returns a boolean that indicates whether the channel is closed. + pub(super) fn upgrade(self) -> Option { + let mut tx_count = self.inner.tx_count.load(Acquire); + + if tx_count == 0 { + // channel is closed + return None; + } + + loop { + match self + .inner + .tx_count + .compare_exchange(tx_count, tx_count + 1, AcqRel, Acquire) + { + Ok(prev_count) => { + assert!(prev_count != 0); + + return Some(self); + } + Err(prev_count) => { + if prev_count == 0 { + return None; + } + + tx_count = prev_count; + } + } + } } pub(super) fn semaphore(&self) -> &S { @@ -170,53 +193,6 @@ impl Tx { } } -impl TxWeak { - fn new(inner: Weak>) -> Self { - TxWeak { inner } - } - - pub(super) fn upgrade(&self) -> Option> { - let inner = self.inner.upgrade(); - - if let Some(inner) = inner { - // If we were able to upgrade, `Chan` is guaranteed to still exist, - // even though the channel might have been closed in the meantime. - // Need to check here whether the channel was actually closed. - - let mut tx_count = inner.tx_count.load(Acquire); - - if tx_count == 0 { - // channel is closed - mem::drop(inner); - return None; - } - - loop { - match inner - .tx_count - .compare_exchange(tx_count, tx_count + 1, AcqRel, Acquire) - { - Ok(prev_count) => { - assert!(prev_count != 0); - - return Some(Tx::new(inner)); - } - Err(prev_count) => { - if prev_count == 0 { - mem::drop(inner); - return None; - } - - tx_count = prev_count; - } - } - } - } else { - None - } - } -} - impl Tx { pub(crate) fn is_closed(&self) -> bool { self.inner.semaphore.is_closed()