From e06b257e09b8ca1def4a3537a4448a31f2ede388 Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Fri, 5 Mar 2021 00:45:40 +0800 Subject: [PATCH] sync: add same_channel method to mpsc Senders (#3532) --- tokio/src/sync/mpsc/bounded.rs | 16 ++++++++++++++++ tokio/src/sync/mpsc/chan.rs | 5 +++++ tokio/src/sync/mpsc/unbounded.rs | 16 ++++++++++++++++ 3 files changed, 37 insertions(+) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index dfe4a749b23..eafb8d778ae 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -698,6 +698,22 @@ impl Sender { Ok(Permit { chan: &self.chan }) } + + /// Returns `true` if senders belong to the same channel. + /// + /// # Examples + /// + /// ``` + /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1); + /// let tx2 = tx.clone(); + /// assert!(tx.same_channel(&tx2)); + /// + /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1); + /// assert!(!tx3.same_channel(&tx2)); + /// ``` + pub fn same_channel(&self, other: &Self) -> bool { + self.chan.same_channel(&other.chan) + } } impl Clone for Sender { diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index f34eb0f2127..df12744f3fc 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -139,6 +139,11 @@ impl Tx { pub(crate) fn wake_rx(&self) { self.inner.rx_waker.wake(); } + + /// Returns `true` if senders belong to the same channel. + pub(crate) fn same_channel(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.inner, &other.inner) + } } impl Tx { diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 29a0a29719e..ffdb34c3b8d 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -291,4 +291,20 @@ impl UnboundedSender { pub fn is_closed(&self) -> bool { self.chan.is_closed() } + + /// Returns `true` if senders belong to the same channel. + /// + /// # Examples + /// + /// ``` + /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>(); + /// let tx2 = tx.clone(); + /// assert!(tx.same_channel(&tx2)); + /// + /// let (tx3, rx3) = tokio::sync::mpsc::unbounded_channel::<()>(); + /// assert!(!tx3.same_channel(&tx2)); + /// ``` + pub fn same_channel(&self, other: &Self) -> bool { + self.chan.same_channel(&other.chan) + } }