From 91d1f7a0b8075a2084eef48bda36655aa7e39389 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 26 Aug 2022 15:33:46 +0200 Subject: [PATCH 1/5] sync: add merge() to semaphore permits Adds a merge() method to: * SemaphorePermit * OwnedSemaphorePermit Merging two permits instances together consumes one instance, adding the permits it holds to the remaining instance. --- tokio/src/sync/semaphore.rs | 18 ++++++++++++++++++ tokio/tests/sync_semaphore.rs | 14 ++++++++++++++ tokio/tests/sync_semaphore_owned.rs | 14 ++++++++++++++ 3 files changed, 46 insertions(+) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 88b3d3d63c5..8cccfab1b8e 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -620,6 +620,15 @@ impl<'a> SemaphorePermit<'a> { pub fn forget(mut self) { self.permits = 0; } + + /// Merge two [`SemaphorePermit`] instances together, consuming `other` + /// without releasing the permits it holds. + /// + /// Permits held by both `self` and `other` are released when `self` drops. + pub fn merge(&mut self, mut other: Self) { + self.permits += other.permits; + other.permits = 0; + } } impl OwnedSemaphorePermit { @@ -629,6 +638,15 @@ impl OwnedSemaphorePermit { pub fn forget(mut self) { self.permits = 0; } + + /// Merge two [`OwnedSemaphorePermit`] instances together, consuming `other` + /// without releasing the permits it holds. + /// + /// Permits held by both `self` and `other` are released when `self` drops. + pub fn merge(&mut self, mut other: Self) { + self.permits += other.permits; + other.permits = 0; + } } impl Drop for SemaphorePermit<'_> { diff --git a/tokio/tests/sync_semaphore.rs b/tokio/tests/sync_semaphore.rs index a061033ed7d..508d83fbde4 100644 --- a/tokio/tests/sync_semaphore.rs +++ b/tokio/tests/sync_semaphore.rs @@ -63,6 +63,20 @@ fn forget() { assert!(sem.try_acquire().is_err()); } +#[test] +fn merge() { + let sem = Arc::new(Semaphore::new(3)); + { + let mut p1 = sem.try_acquire().unwrap(); + assert_eq!(sem.available_permits(), 2); + let p2 = sem.try_acquire_many(2).unwrap(); + assert_eq!(sem.available_permits(), 0); + p1.merge(p2); + assert_eq!(sem.available_permits(), 0); + } + assert_eq!(sem.available_permits(), 3); +} + #[tokio::test] #[cfg(feature = "full")] async fn stress_test() { diff --git a/tokio/tests/sync_semaphore_owned.rs b/tokio/tests/sync_semaphore_owned.rs index a09346f17f8..9c828ace3e1 100644 --- a/tokio/tests/sync_semaphore_owned.rs +++ b/tokio/tests/sync_semaphore_owned.rs @@ -89,6 +89,20 @@ fn forget() { assert!(sem.try_acquire_owned().is_err()); } +#[test] +fn merge() { + let sem = Arc::new(Semaphore::new(3)); + { + let mut p1 = sem.clone().try_acquire_owned().unwrap(); + assert_eq!(sem.available_permits(), 2); + let p2 = sem.clone().try_acquire_many_owned(2).unwrap(); + assert_eq!(sem.available_permits(), 0); + p1.merge(p2); + assert_eq!(sem.available_permits(), 0); + } + assert_eq!(sem.available_permits(), 3); +} + #[tokio::test] #[cfg(feature = "full")] async fn stress_test() { From 1ff74fb48650125a93a528e9e42c10b3b9b25349 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Sun, 28 Aug 2022 09:46:11 +0200 Subject: [PATCH 2/5] fix: do not allow merging of unrelated Semaphores Prevent merging permits from two different Semaphore instances. Attempting to merge unrelated permits will panic. --- tokio/src/sync/semaphore.rs | 8 ++++++++ tokio/tests/sync_semaphore.rs | 10 ++++++++++ tokio/tests/sync_semaphore_owned.rs | 10 ++++++++++ 3 files changed, 28 insertions(+) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 8cccfab1b8e..317f8c39b8a 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -626,6 +626,10 @@ impl<'a> SemaphorePermit<'a> { /// /// Permits held by both `self` and `other` are released when `self` drops. pub fn merge(&mut self, mut other: Self) { + assert!( + std::ptr::eq(self.sem, other.sem), + "merging permits from different semaphore instances" + ); self.permits += other.permits; other.permits = 0; } @@ -644,6 +648,10 @@ impl OwnedSemaphorePermit { /// /// Permits held by both `self` and `other` are released when `self` drops. pub fn merge(&mut self, mut other: Self) { + assert!( + Arc::ptr_eq(&self.sem, &other.sem), + "merging permits from different semaphore instances" + ); self.permits += other.permits; other.permits = 0; } diff --git a/tokio/tests/sync_semaphore.rs b/tokio/tests/sync_semaphore.rs index 508d83fbde4..dfd2abaa218 100644 --- a/tokio/tests/sync_semaphore.rs +++ b/tokio/tests/sync_semaphore.rs @@ -77,6 +77,16 @@ fn merge() { assert_eq!(sem.available_permits(), 3); } +#[test] +#[should_panic] +fn merge_unrelated_permits() { + let sem1 = Arc::new(Semaphore::new(3)); + let sem2 = Arc::new(Semaphore::new(3)); + let mut p1 = sem1.try_acquire().unwrap(); + let p2 = sem2.try_acquire().unwrap(); + p1.merge(p2); +} + #[tokio::test] #[cfg(feature = "full")] async fn stress_test() { diff --git a/tokio/tests/sync_semaphore_owned.rs b/tokio/tests/sync_semaphore_owned.rs index 9c828ace3e1..0ccd4ccefa8 100644 --- a/tokio/tests/sync_semaphore_owned.rs +++ b/tokio/tests/sync_semaphore_owned.rs @@ -103,6 +103,16 @@ fn merge() { assert_eq!(sem.available_permits(), 3); } +#[test] +#[should_panic] +fn merge_unrelated_permits() { + let sem1 = Arc::new(Semaphore::new(3)); + let sem2 = Arc::new(Semaphore::new(3)); + let mut p1 = sem1.try_acquire_owned().unwrap(); + let p2 = sem2.try_acquire_owned().unwrap(); + p1.merge(p2) +} + #[tokio::test] #[cfg(feature = "full")] async fn stress_test() { From ec172d482191ddd46e64a2c93febd4ef95f27ebd Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 31 Aug 2022 10:35:48 +0200 Subject: [PATCH 3/5] docs: document panics / track_caller Document merging permits from distinct Semaphore instances causes a panic, and add #[track_caller] annotation. --- tokio/src/sync/semaphore.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 317f8c39b8a..ccf44ba8a88 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -625,6 +625,12 @@ impl<'a> SemaphorePermit<'a> { /// without releasing the permits it holds. /// /// Permits held by both `self` and `other` are released when `self` drops. + /// + /// # Panics + /// + /// This function panics if permits from different [`Semaphore`] instances + /// are merged. + #[track_caller] pub fn merge(&mut self, mut other: Self) { assert!( std::ptr::eq(self.sem, other.sem), @@ -647,6 +653,12 @@ impl OwnedSemaphorePermit { /// without releasing the permits it holds. /// /// Permits held by both `self` and `other` are released when `self` drops. + /// + /// # Panics + /// + /// This function panics if permits from different [`Semaphore`] instances + /// are merged. + #[track_caller] pub fn merge(&mut self, mut other: Self) { assert!( Arc::ptr_eq(&self.sem, &other.sem), From 39b9bf10244950223348357e90019751729904e2 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 31 Aug 2022 18:28:05 +0200 Subject: [PATCH 4/5] test: assert track_caller on permit merge --- tokio/tests/sync_panic.rs | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/tokio/tests/sync_panic.rs b/tokio/tests/sync_panic.rs index 11213b51544..6c23664998f 100644 --- a/tokio/tests/sync_panic.rs +++ b/tokio/tests/sync_panic.rs @@ -1,10 +1,10 @@ #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", not(tokio_wasi)))] -use std::error::Error; +use std::{error::Error, sync::Arc}; use tokio::{ runtime::{Builder, Runtime}, - sync::{broadcast, mpsc, oneshot, Mutex, RwLock}, + sync::{broadcast, mpsc, oneshot, Mutex, RwLock, Semaphore}, }; mod support { @@ -160,6 +160,38 @@ fn mpsc_unbounded_receiver_blocking_recv_panic_caller() -> Result<(), Box Result<(), Box> { + let panic_location_file = test_panic(|| { + let sem1 = Arc::new(Semaphore::new(42)); + let sem2 = Arc::new(Semaphore::new(42)); + let mut p1 = sem1.try_acquire_owned().unwrap(); + let p2 = sem2.try_acquire_owned().unwrap(); + p1.merge(p2); + }); + + // The panic location should be in this file + assert_eq!(&panic_location_file.unwrap(), file!()); + + Ok(()) +} + +#[test] +fn semaphore_merge_unrelated_permits() -> Result<(), Box> { + let panic_location_file = test_panic(|| { + let sem1 = Semaphore::new(42); + let sem2 = Semaphore::new(42); + let mut p1 = sem1.try_acquire().unwrap(); + let p2 = sem2.try_acquire().unwrap(); + p1.merge(p2); + }); + + // The panic location should be in this file + assert_eq!(&panic_location_file.unwrap(), file!()); + + Ok(()) +} + fn current_thread() -> Runtime { Builder::new_current_thread().enable_all().build().unwrap() } From f8dc306fc5765b70bdca5d23c40857433a69f7f3 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 19 Sep 2022 15:08:01 +0200 Subject: [PATCH 5/5] test: disable panic tests for wasm targets --- tokio/tests/sync_semaphore.rs | 1 + tokio/tests/sync_semaphore_owned.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/tokio/tests/sync_semaphore.rs b/tokio/tests/sync_semaphore.rs index dfd2abaa218..f12edb7dfbc 100644 --- a/tokio/tests/sync_semaphore.rs +++ b/tokio/tests/sync_semaphore.rs @@ -78,6 +78,7 @@ fn merge() { } #[test] +#[cfg(not(tokio_wasm))] // No stack unwinding on wasm targets #[should_panic] fn merge_unrelated_permits() { let sem1 = Arc::new(Semaphore::new(3)); diff --git a/tokio/tests/sync_semaphore_owned.rs b/tokio/tests/sync_semaphore_owned.rs index 0ccd4ccefa8..f6945764786 100644 --- a/tokio/tests/sync_semaphore_owned.rs +++ b/tokio/tests/sync_semaphore_owned.rs @@ -104,6 +104,7 @@ fn merge() { } #[test] +#[cfg(not(tokio_wasm))] // No stack unwinding on wasm targets #[should_panic] fn merge_unrelated_permits() { let sem1 = Arc::new(Semaphore::new(3));