From 9d84ce9757e24716213e2554253ffea7c3c3e334 Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Thu, 18 Feb 2021 23:09:19 +0800 Subject: [PATCH 1/4] sync: impl PartialEq & Eq for mpsc Sender & Receiver --- tokio/src/sync/mpsc/bounded.rs | 2 ++ tokio/src/sync/mpsc/chan.rs | 16 ++++++++++++++++ tokio/src/sync/mpsc/unbounded.rs | 2 ++ 3 files changed, 20 insertions(+) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index dfe4a749b23..f6982b1048e 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -16,6 +16,7 @@ use std::task::{Context, Poll}; /// Send values to the associated `Receiver`. /// /// Instances are created by the [`channel`](channel) function. +#[derive(PartialEq, Eq)] pub struct Sender { chan: chan::Tx, } @@ -38,6 +39,7 @@ pub struct Permit<'a, T> { /// This receiver can be turned into a `Stream` using [`ReceiverStream`]. /// /// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html +#[derive(PartialEq, Eq)] pub struct Receiver { /// The channel receiver chan: chan::Rx, diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index f34eb0f2127..eb7f7ec92ff 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -22,6 +22,14 @@ impl fmt::Debug for Tx { } } +impl PartialEq for Tx { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.inner, &other.inner) + } +} + +impl Eq for Tx {} + /// Channel receiver pub(crate) struct Rx { inner: Arc>, @@ -33,6 +41,14 @@ impl fmt::Debug for Rx { } } +impl PartialEq for Rx { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.inner, &other.inner) + } +} + +impl Eq for Rx {} + pub(crate) trait Semaphore { fn is_idle(&self) -> bool; diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 29a0a29719e..ec19b9ac9ee 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -9,6 +9,7 @@ use std::task::{Context, Poll}; /// /// Instances are created by the /// [`unbounded_channel`](unbounded_channel) function. +#[derive(PartialEq, Eq)] pub struct UnboundedSender { chan: chan::Tx, } @@ -37,6 +38,7 @@ impl fmt::Debug for UnboundedSender { /// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`]. /// /// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html +#[derive(PartialEq, Eq)] pub struct UnboundedReceiver { /// The channel receiver chan: chan::Rx, From 7083fbe3d5c63e83ce531c31057f9bae64008718 Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Sat, 20 Feb 2021 09:56:17 +0800 Subject: [PATCH 2/4] sync: use same_channel method for mpsc senders instead --- tokio/src/sync/mpsc/bounded.rs | 7 +++++-- tokio/src/sync/mpsc/chan.rs | 21 +++++---------------- tokio/src/sync/mpsc/unbounded.rs | 7 +++++-- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index f6982b1048e..c276802c516 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -16,7 +16,6 @@ use std::task::{Context, Poll}; /// Send values to the associated `Receiver`. /// /// Instances are created by the [`channel`](channel) function. -#[derive(PartialEq, Eq)] pub struct Sender { chan: chan::Tx, } @@ -39,7 +38,6 @@ pub struct Permit<'a, T> { /// This receiver can be turned into a `Stream` using [`ReceiverStream`]. /// /// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html -#[derive(PartialEq, Eq)] pub struct Receiver { /// The channel receiver chan: chan::Rx, @@ -700,6 +698,11 @@ impl Sender { Ok(Permit { chan: &self.chan }) } + + /// Returns `true` if senders belong to the same channel. + pub fn same_channel(&self, other: &Self) -> bool { + self.chan.same_channel(&other.chan) + } } impl Clone for Sender { diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index eb7f7ec92ff..df12744f3fc 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -22,14 +22,6 @@ impl fmt::Debug for Tx { } } -impl PartialEq for Tx { - fn eq(&self, other: &Self) -> bool { - Arc::ptr_eq(&self.inner, &other.inner) - } -} - -impl Eq for Tx {} - /// Channel receiver pub(crate) struct Rx { inner: Arc>, @@ -41,14 +33,6 @@ impl fmt::Debug for Rx { } } -impl PartialEq for Rx { - fn eq(&self, other: &Self) -> bool { - Arc::ptr_eq(&self.inner, &other.inner) - } -} - -impl Eq for Rx {} - pub(crate) trait Semaphore { fn is_idle(&self) -> bool; @@ -155,6 +139,11 @@ impl Tx { pub(crate) fn wake_rx(&self) { self.inner.rx_waker.wake(); } + + /// Returns `true` if senders belong to the same channel. + pub(crate) fn same_channel(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.inner, &other.inner) + } } impl Tx { diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index ec19b9ac9ee..e7924500474 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -9,7 +9,6 @@ use std::task::{Context, Poll}; /// /// Instances are created by the /// [`unbounded_channel`](unbounded_channel) function. -#[derive(PartialEq, Eq)] pub struct UnboundedSender { chan: chan::Tx, } @@ -38,7 +37,6 @@ impl fmt::Debug for UnboundedSender { /// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`]. /// /// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html -#[derive(PartialEq, Eq)] pub struct UnboundedReceiver { /// The channel receiver chan: chan::Rx, @@ -293,4 +291,9 @@ impl UnboundedSender { pub fn is_closed(&self) -> bool { self.chan.is_closed() } + + /// Returns `true` if senders belong to the same channel. + pub fn same_channel(&self, other: &Self) -> bool { + self.chan.same_channel(&other.chan) + } } From 74ec3288c4d11a1711c0692674a58d16b1a7c9a1 Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Sun, 21 Feb 2021 10:15:16 +0800 Subject: [PATCH 3/4] sync: add doc test for mpsc same_channel --- tokio/src/sync/mpsc/bounded.rs | 9 +++++++++ tokio/src/sync/mpsc/unbounded.rs | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index c276802c516..8b3f70fdc91 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -700,6 +700,15 @@ impl Sender { } /// Returns `true` if senders belong to the same channel. + /// + /// ``` + /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1); + /// let tx2 = tx.clone(); + /// assert!(tx.same_channel(&tx2)); + /// + /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1); + /// assert!(!tx3.same_channel(&tx2)); + /// ``` pub fn same_channel(&self, other: &Self) -> bool { self.chan.same_channel(&other.chan) } diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index e7924500474..1940f9147f7 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -293,6 +293,15 @@ impl UnboundedSender { } /// Returns `true` if senders belong to the same channel. + /// + /// ``` + /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>(); + /// let tx2 = tx.clone(); + /// assert!(tx.same_channel(&tx2)); + /// + /// let (tx3, rx3) = tokio::sync::mpsc::unbounded_channel::<()>(); + /// assert!(!tx3.same_channel(&tx2)); + /// ``` pub fn same_channel(&self, other: &Self) -> bool { self.chan.same_channel(&other.chan) } From 06c01f198a71fa3bd14a1f8869d6e16b9223337b Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Mon, 22 Feb 2021 09:30:51 +0800 Subject: [PATCH 4/4] sync: add examples section to doc of same_channel --- tokio/src/sync/mpsc/bounded.rs | 2 ++ tokio/src/sync/mpsc/unbounded.rs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 8b3f70fdc91..eafb8d778ae 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -701,6 +701,8 @@ impl Sender { /// Returns `true` if senders belong to the same channel. /// + /// # Examples + /// /// ``` /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1); /// let tx2 = tx.clone(); diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 1940f9147f7..ffdb34c3b8d 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -294,6 +294,8 @@ impl UnboundedSender { /// Returns `true` if senders belong to the same channel. /// + /// # Examples + /// /// ``` /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>(); /// let tx2 = tx.clone();