From 364bda618f279b4a725059d8d4d31b628d8128a1 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 4 May 2024 16:35:28 +0900 Subject: [PATCH 1/5] Add select_biased! macro --- crossbeam-channel/src/select.rs | 39 +++++++++++++++---------- crossbeam-channel/src/select_macro.rs | 38 ++++++++++++++++++++---- crossbeam-channel/tests/mpsc.rs | 2 ++ crossbeam-channel/tests/select_macro.rs | 35 +++++++++++++++++++++- 4 files changed, 92 insertions(+), 22 deletions(-) diff --git a/crossbeam-channel/src/select.rs b/crossbeam-channel/src/select.rs index ac9e408d3..0de43e0b5 100644 --- a/crossbeam-channel/src/select.rs +++ b/crossbeam-channel/src/select.rs @@ -177,6 +177,7 @@ enum Timeout { fn run_select( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, + is_biased: bool, ) -> Option<(Token, usize, *const u8)> { if handles.is_empty() { // Wait until the timeout and return. @@ -193,8 +194,10 @@ fn run_select( } } - // Shuffle the operations for fairness. - utils::shuffle(handles); + if !is_biased { + // Shuffle the operations for fairness. + utils::shuffle(handles); + } // Create a token, which serves as a temporary variable that gets initialized in this function // and is later used by a call to `channel::read()` or `channel::write()` that completes the @@ -325,6 +328,7 @@ fn run_select( fn run_ready( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, + is_biased: bool, ) -> Option { if handles.is_empty() { // Wait until the timeout and return. @@ -341,8 +345,10 @@ fn run_ready( } } - // Shuffle the operations for fairness. - utils::shuffle(handles); + if !is_biased { + // Shuffle the operations for fairness. + utils::shuffle(handles); + } loop { let backoff = Backoff::new(); @@ -451,7 +457,7 @@ fn run_ready( pub fn try_select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> Result, TrySelectError> { - match run_select(handles, Timeout::Now) { + match run_select(handles, Timeout::Now, false) { None => Err(TrySelectError), Some((token, index, ptr)) => Ok(SelectedOperation { token, @@ -467,12 +473,13 @@ pub fn try_select<'a>( #[inline] pub fn select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], + is_biased: bool, ) -> SelectedOperation<'a> { if handles.is_empty() { panic!("no operations have been added to `Select`"); } - let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap(); + let (token, index, ptr) = run_select(handles, Timeout::Never, is_biased).unwrap(); SelectedOperation { token, index, @@ -487,10 +494,11 @@ pub fn select<'a>( pub fn select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], timeout: Duration, + is_biased: bool, ) -> Result, SelectTimeoutError> { match Instant::now().checked_add(timeout) { - Some(deadline) => select_deadline(handles, deadline), - None => Ok(select(handles)), + Some(deadline) => select_deadline(handles, deadline, is_biased), + None => Ok(select(handles, is_biased)), } } @@ -499,8 +507,9 @@ pub fn select_timeout<'a>( pub(crate) fn select_deadline<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], deadline: Instant, + is_biased: bool, ) -> Result, SelectTimeoutError> { - match run_select(handles, Timeout::At(deadline)) { + match run_select(handles, Timeout::At(deadline), is_biased) { None => Err(SelectTimeoutError), Some((token, index, ptr)) => Ok(SelectedOperation { token, @@ -815,7 +824,7 @@ impl<'a> Select<'a> { /// # t2.join().unwrap(); // join thread to avoid https://github.com/rust-lang/miri/issues/1371 /// ``` pub fn select(&mut self) -> SelectedOperation<'a> { - select(&mut self.handles) + select(&mut self.handles, false) } /// Blocks for a limited time until one of the operations becomes ready and selects it. @@ -869,7 +878,7 @@ impl<'a> Select<'a> { &mut self, timeout: Duration, ) -> Result, SelectTimeoutError> { - select_timeout(&mut self.handles, timeout) + select_timeout(&mut self.handles, timeout, false) } /// Blocks until a given deadline, or until one of the operations becomes ready and selects it. @@ -925,7 +934,7 @@ impl<'a> Select<'a> { &mut self, deadline: Instant, ) -> Result, SelectTimeoutError> { - select_deadline(&mut self.handles, deadline) + select_deadline(&mut self.handles, deadline, false) } /// Attempts to find a ready operation without blocking. @@ -964,7 +973,7 @@ impl<'a> Select<'a> { /// } /// ``` pub fn try_ready(&mut self) -> Result { - match run_ready(&mut self.handles, Timeout::Now) { + match run_ready(&mut self.handles, Timeout::Now, false) { None => Err(TryReadyError), Some(index) => Ok(index), } @@ -1021,7 +1030,7 @@ impl<'a> Select<'a> { panic!("no operations have been added to `Select`"); } - run_ready(&mut self.handles, Timeout::Never).unwrap() + run_ready(&mut self.handles, Timeout::Never, false).unwrap() } /// Blocks for a limited time until one of the operations becomes ready. @@ -1122,7 +1131,7 @@ impl<'a> Select<'a> { /// # t2.join().unwrap(); // join thread to avoid https://github.com/rust-lang/miri/issues/1371 /// ``` pub fn ready_deadline(&mut self, deadline: Instant) -> Result { - match run_ready(&mut self.handles, Timeout::At(deadline)) { + match run_ready(&mut self.handles, Timeout::At(deadline), false) { None => Err(ReadyTimeoutError), Some(index) => Ok(index), } diff --git a/crossbeam-channel/src/select_macro.rs b/crossbeam-channel/src/select_macro.rs index 3b71e1e50..323ccfae2 100644 --- a/crossbeam-channel/src/select_macro.rs +++ b/crossbeam-channel/src/select_macro.rs @@ -750,7 +750,7 @@ macro_rules! crossbeam_channel_internal { $cases:tt ) => {{ let _oper: $crate::SelectedOperation<'_> = { - let _oper = $crate::internal::select(&mut $sel); + let _oper = $crate::internal::select(&mut $sel, _IS_BIASED); // Erase the lifetime so that `sel` can be dropped early even without NLL. unsafe { ::std::mem::transmute(_oper) } @@ -802,7 +802,7 @@ macro_rules! crossbeam_channel_internal { $cases:tt ) => {{ let _oper: ::std::option::Option<$crate::SelectedOperation<'_>> = { - let _oper = $crate::internal::select_timeout(&mut $sel, $timeout); + let _oper = $crate::internal::select_timeout(&mut $sel, $timeout, false); // Erase the lifetime so that `sel` can be dropped early even without NLL. unsafe { ::std::mem::transmute(_oper) } @@ -985,7 +985,8 @@ macro_rules! crossbeam_channel_internal { /// /// This macro allows you to define a set of channel operations, wait until any one of them becomes /// ready, and finally execute it. If multiple operations are ready at the same time, a random one -/// among them is selected. +/// among them is selected (i.e. the unbiased selection). Use `select_biased!` for the biased +/// selection. /// /// It is also possible to define a `default` case that gets executed if none of the operations are /// ready, either right away or for a certain duration of time. @@ -1121,8 +1122,33 @@ macro_rules! crossbeam_channel_internal { #[macro_export] macro_rules! select { ($($tokens:tt)*) => { - $crate::crossbeam_channel_internal!( - $($tokens)* - ) + { + const _IS_BIASED: bool = false; + + $crate::crossbeam_channel_internal!( + $($tokens)* + ) + } + }; +} + +/// Selects from a set of channel operations. +/// +/// This macro allows you to define a list of channel operations, wait until any one of them +/// becomes ready, and finally execute it. If multiple operations are ready at the same time, the +/// operation nearest to the front of the list is always selected (i.e. the biased selection). Use +/// [`select!`] for the unbiased selection. +/// +/// Otherwise, this macro's functionality is identical to [`select!`]. Refer to it for the syntax. +#[macro_export] +macro_rules! select_biased { + ($($tokens:tt)*) => { + { + const _IS_BIASED: bool = true; + + $crate::crossbeam_channel_internal!( + $($tokens)* + ) + } }; } diff --git a/crossbeam-channel/tests/mpsc.rs b/crossbeam-channel/tests/mpsc.rs index 5c09ee88c..307e2f400 100644 --- a/crossbeam-channel/tests/mpsc.rs +++ b/crossbeam-channel/tests/mpsc.rs @@ -176,6 +176,8 @@ macro_rules! select { ( $($name:pat = $rx:ident.$meth:ident() => $code:expr),+ ) => ({ + const _IS_BIASED: bool = false; + cc::crossbeam_channel_internal! { $( $meth(($rx).inner) -> res => { diff --git a/crossbeam-channel/tests/select_macro.rs b/crossbeam-channel/tests/select_macro.rs index c8b96c18d..4d8f4ccc1 100644 --- a/crossbeam-channel/tests/select_macro.rs +++ b/crossbeam-channel/tests/select_macro.rs @@ -9,7 +9,7 @@ use std::ops::Deref; use std::thread; use std::time::{Duration, Instant}; -use crossbeam_channel::{after, bounded, never, select, tick, unbounded}; +use crossbeam_channel::{after, bounded, never, select, select_biased, tick, unbounded}; use crossbeam_channel::{Receiver, RecvError, SendError, Sender, TryRecvError}; use crossbeam_utils::thread::scope; @@ -943,6 +943,39 @@ fn fairness_send() { assert!(hits.iter().all(|x| *x >= COUNT / 4)); } +#[test] +fn unfairness() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = unbounded::<()>(); + + for _ in 0..COUNT { + s1.send(()).unwrap(); + s2.send(()).unwrap(); + } + + let mut hits = [0usize; 2]; + for _ in 0..COUNT { + select_biased! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + } + } + assert_eq!(hits, [COUNT, 0]); + + for _ in 0..COUNT { + select_biased! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + } + } + assert_eq!(hits, [COUNT, COUNT]); +} + #[allow(clippy::or_fun_call, clippy::unnecessary_literal_unwrap)] // This is intentional. #[test] fn references() { From 53a3d5d9c85635a4254c7fa6b1a2337a1c89711c Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 4 May 2024 21:49:50 +0900 Subject: [PATCH 2/5] Improve test a bit --- crossbeam-channel/tests/select_macro.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crossbeam-channel/tests/select_macro.rs b/crossbeam-channel/tests/select_macro.rs index 4d8f4ccc1..412aca083 100644 --- a/crossbeam-channel/tests/select_macro.rs +++ b/crossbeam-channel/tests/select_macro.rs @@ -952,17 +952,20 @@ fn unfairness() { let (s1, r1) = unbounded::<()>(); let (s2, r2) = unbounded::<()>(); + let (s3, r3) = unbounded::<()>(); for _ in 0..COUNT { s1.send(()).unwrap(); s2.send(()).unwrap(); } + s3.send(()).unwrap(); let mut hits = [0usize; 2]; for _ in 0..COUNT { select_biased! { recv(r1) -> _ => hits[0] += 1, recv(r2) -> _ => hits[1] += 1, + recv(r3) -> _ => unreachable!(), } } assert_eq!(hits, [COUNT, 0]); @@ -971,6 +974,7 @@ fn unfairness() { select_biased! { recv(r1) -> _ => hits[0] += 1, recv(r2) -> _ => hits[1] += 1, + recv(r3) -> _ => unreachable!(), } } assert_eq!(hits, [COUNT, COUNT]); From d96b7d4a8a881ec8d29b430378a7cc9bee7113bd Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 4 May 2024 21:50:11 +0900 Subject: [PATCH 3/5] Enforce biased selection even with timeout --- crossbeam-channel/src/select_macro.rs | 2 +- crossbeam-channel/tests/select_macro.rs | 39 +++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/crossbeam-channel/src/select_macro.rs b/crossbeam-channel/src/select_macro.rs index 323ccfae2..b6c2a31b6 100644 --- a/crossbeam-channel/src/select_macro.rs +++ b/crossbeam-channel/src/select_macro.rs @@ -802,7 +802,7 @@ macro_rules! crossbeam_channel_internal { $cases:tt ) => {{ let _oper: ::std::option::Option<$crate::SelectedOperation<'_>> = { - let _oper = $crate::internal::select_timeout(&mut $sel, $timeout, false); + let _oper = $crate::internal::select_timeout(&mut $sel, $timeout, _IS_BIASED); // Erase the lifetime so that `sel` can be dropped early even without NLL. unsafe { ::std::mem::transmute(_oper) } diff --git a/crossbeam-channel/tests/select_macro.rs b/crossbeam-channel/tests/select_macro.rs index 412aca083..7a2a64ed7 100644 --- a/crossbeam-channel/tests/select_macro.rs +++ b/crossbeam-channel/tests/select_macro.rs @@ -980,6 +980,45 @@ fn unfairness() { assert_eq!(hits, [COUNT, COUNT]); } +#[test] +fn unfairness_timeout() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = unbounded::<()>(); + let (s3, r3) = unbounded::<()>(); + + for _ in 0..COUNT { + s1.send(()).unwrap(); + s2.send(()).unwrap(); + } + s3.send(()).unwrap(); + + let mut hits = [0usize; 2]; + for _ in 0..COUNT { + select_biased! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + recv(r3) -> _ => unreachable!(), + default(ms(1000)) => panic!(), + } + } + assert_eq!(hits, [COUNT, 0]); + + for _ in 0..COUNT { + select_biased! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + recv(r3) -> _ => unreachable!(), + default(ms(1000)) => panic!(), + } + } + assert_eq!(hits, [COUNT, COUNT]); +} + #[allow(clippy::or_fun_call, clippy::unnecessary_literal_unwrap)] // This is intentional. #[test] fn references() { From 1ba63bc95f9bccffff1371e78bf10eaa66d1d2cb Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 4 May 2024 21:56:52 +0900 Subject: [PATCH 4/5] Enforce biased selection even without blocking --- crossbeam-channel/src/select.rs | 5 ++-- crossbeam-channel/src/select_macro.rs | 2 +- crossbeam-channel/tests/select_macro.rs | 39 +++++++++++++++++++++++++ 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/crossbeam-channel/src/select.rs b/crossbeam-channel/src/select.rs index 0de43e0b5..80fb32a50 100644 --- a/crossbeam-channel/src/select.rs +++ b/crossbeam-channel/src/select.rs @@ -456,8 +456,9 @@ fn run_ready( #[inline] pub fn try_select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], + is_biased: bool, ) -> Result, TrySelectError> { - match run_select(handles, Timeout::Now, false) { + match run_select(handles, Timeout::Now, is_biased) { None => Err(TrySelectError), Some((token, index, ptr)) => Ok(SelectedOperation { token, @@ -773,7 +774,7 @@ impl<'a> Select<'a> { /// } /// ``` pub fn try_select(&mut self) -> Result, TrySelectError> { - try_select(&mut self.handles) + try_select(&mut self.handles, false) } /// Blocks until one of the operations becomes ready and selects it. diff --git a/crossbeam-channel/src/select_macro.rs b/crossbeam-channel/src/select_macro.rs index b6c2a31b6..f7dfe8c9a 100644 --- a/crossbeam-channel/src/select_macro.rs +++ b/crossbeam-channel/src/select_macro.rs @@ -772,7 +772,7 @@ macro_rules! crossbeam_channel_internal { $cases:tt ) => {{ let _oper: ::std::option::Option<$crate::SelectedOperation<'_>> = { - let _oper = $crate::internal::try_select(&mut $sel); + let _oper = $crate::internal::try_select(&mut $sel, _IS_BIASED); // Erase the lifetime so that `sel` can be dropped early even without NLL. unsafe { ::std::mem::transmute(_oper) } diff --git a/crossbeam-channel/tests/select_macro.rs b/crossbeam-channel/tests/select_macro.rs index 7a2a64ed7..dd91eb6c9 100644 --- a/crossbeam-channel/tests/select_macro.rs +++ b/crossbeam-channel/tests/select_macro.rs @@ -1019,6 +1019,45 @@ fn unfairness_timeout() { assert_eq!(hits, [COUNT, COUNT]); } +#[test] +fn unfairness_try() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = unbounded::<()>(); + let (s3, r3) = unbounded::<()>(); + + for _ in 0..COUNT { + s1.send(()).unwrap(); + s2.send(()).unwrap(); + } + s3.send(()).unwrap(); + + let mut hits = [0usize; 2]; + for _ in 0..COUNT { + select_biased! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + recv(r3) -> _ => unreachable!(), + default() => panic!(), + } + } + assert_eq!(hits, [COUNT, 0]); + + for _ in 0..COUNT { + select_biased! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + recv(r3) -> _ => unreachable!(), + default() => panic!(), + } + } + assert_eq!(hits, [COUNT, COUNT]); +} + #[allow(clippy::or_fun_call, clippy::unnecessary_literal_unwrap)] // This is intentional. #[test] fn references() { From 5d1d81a966adb9c665ac0b3cb17231f69f3c0e16 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 4 May 2024 22:04:35 +0900 Subject: [PATCH 5/5] Improve test so that assert_eq! would be triggered --- crossbeam-channel/tests/select_macro.rs | 38 ++++++++++++------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/crossbeam-channel/tests/select_macro.rs b/crossbeam-channel/tests/select_macro.rs index dd91eb6c9..618faf4e1 100644 --- a/crossbeam-channel/tests/select_macro.rs +++ b/crossbeam-channel/tests/select_macro.rs @@ -960,24 +960,24 @@ fn unfairness() { } s3.send(()).unwrap(); - let mut hits = [0usize; 2]; + let mut hits = [0usize; 3]; for _ in 0..COUNT { select_biased! { recv(r1) -> _ => hits[0] += 1, recv(r2) -> _ => hits[1] += 1, - recv(r3) -> _ => unreachable!(), + recv(r3) -> _ => hits[2] += 1, } } - assert_eq!(hits, [COUNT, 0]); + assert_eq!(hits, [COUNT, 0, 0]); for _ in 0..COUNT { select_biased! { recv(r1) -> _ => hits[0] += 1, recv(r2) -> _ => hits[1] += 1, - recv(r3) -> _ => unreachable!(), + recv(r3) -> _ => hits[2] += 1, } } - assert_eq!(hits, [COUNT, COUNT]); + assert_eq!(hits, [COUNT, COUNT, 0]); } #[test] @@ -997,26 +997,26 @@ fn unfairness_timeout() { } s3.send(()).unwrap(); - let mut hits = [0usize; 2]; + let mut hits = [0usize; 3]; for _ in 0..COUNT { select_biased! { recv(r1) -> _ => hits[0] += 1, recv(r2) -> _ => hits[1] += 1, - recv(r3) -> _ => unreachable!(), - default(ms(1000)) => panic!(), + recv(r3) -> _ => hits[2] += 1, + default(ms(1000)) => unreachable!(), } } - assert_eq!(hits, [COUNT, 0]); + assert_eq!(hits, [COUNT, 0, 0]); for _ in 0..COUNT { select_biased! { recv(r1) -> _ => hits[0] += 1, recv(r2) -> _ => hits[1] += 1, - recv(r3) -> _ => unreachable!(), - default(ms(1000)) => panic!(), + recv(r3) -> _ => hits[2] += 1, + default(ms(1000)) => unreachable!(), } } - assert_eq!(hits, [COUNT, COUNT]); + assert_eq!(hits, [COUNT, COUNT, 0]); } #[test] @@ -1036,26 +1036,26 @@ fn unfairness_try() { } s3.send(()).unwrap(); - let mut hits = [0usize; 2]; + let mut hits = [0usize; 3]; for _ in 0..COUNT { select_biased! { recv(r1) -> _ => hits[0] += 1, recv(r2) -> _ => hits[1] += 1, - recv(r3) -> _ => unreachable!(), - default() => panic!(), + recv(r3) -> _ => hits[2] += 1, + default() => unreachable!(), } } - assert_eq!(hits, [COUNT, 0]); + assert_eq!(hits, [COUNT, 0, 0]); for _ in 0..COUNT { select_biased! { recv(r1) -> _ => hits[0] += 1, recv(r2) -> _ => hits[1] += 1, - recv(r3) -> _ => unreachable!(), - default() => panic!(), + recv(r3) -> _ => hits[2] += 1, + default() => unreachable!(), } } - assert_eq!(hits, [COUNT, COUNT]); + assert_eq!(hits, [COUNT, COUNT, 0]); } #[allow(clippy::or_fun_call, clippy::unnecessary_literal_unwrap)] // This is intentional.