From 0d883ed52c9886ac3c385d77c6eea21d1083f0cf Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Fri, 2 Aug 2019 14:52:04 -0700 Subject: [PATCH 1/2] Add the ability to spawn futures Add a way to spawn tasks with a returned `Future`. The task is immediately queued for the thread pool to execute. --- rayon-core/src/future.rs | 138 +++++++++++++++++++++++++++++++++++++++ rayon-core/src/lib.rs | 2 + 2 files changed, 140 insertions(+) create mode 100644 rayon-core/src/future.rs diff --git a/rayon-core/src/future.rs b/rayon-core/src/future.rs new file mode 100644 index 000000000..ef703eca5 --- /dev/null +++ b/rayon-core/src/future.rs @@ -0,0 +1,138 @@ +#![allow(missing_docs)] + +use crate::job::JobResult; +use crate::unwind; +use crate::ThreadPool; +use crate::{spawn, spawn_fifo}; +use crate::{Scope, ScopeFifo}; + +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; + +struct RayonFuture { + state: Arc>>, +} + +struct RayonFutureJob { + state: Arc>>, +} + +struct State { + result: JobResult, + waker: Option, +} + +fn new() -> (RayonFuture, RayonFutureJob) { + let state = Arc::new(Mutex::new(State { + result: JobResult::None, + waker: None, + })); + ( + RayonFuture { + state: state.clone(), + }, + RayonFutureJob { state }, + ) +} + +impl Future for RayonFuture { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut guard = self.state.lock().expect("rayon future lock"); + match mem::replace(&mut guard.result, JobResult::None) { + JobResult::None => { + guard.waker = Some(cx.waker().clone()); + Poll::Pending + } + JobResult::Ok(x) => Poll::Ready(x), + JobResult::Panic(p) => { + drop(guard); // don't poison the lock + unwind::resume_unwinding(p); + } + } + } +} + +impl RayonFutureJob { + fn execute(self, func: impl FnOnce() -> T) { + let result = unwind::halt_unwinding(func); + let mut guard = self.state.lock().expect("rayon future lock"); + guard.result = match result { + Ok(x) => JobResult::Ok(x), + Err(p) => JobResult::Panic(p), + }; + if let Some(waker) = guard.waker.take() { + waker.wake(); + } + } +} + +pub fn spawn_future(func: F) -> impl Future +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + let (future, job) = new(); + spawn(move || job.execute(func)); + future +} + +pub fn spawn_fifo_future(func: F) -> impl Future +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + let (future, job) = new(); + spawn_fifo(move || job.execute(func)); + future +} + +impl ThreadPool { + pub fn spawn_future(&self, func: F) -> impl Future + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + let (future, job) = new(); + self.spawn(move || job.execute(func)); + future + } + + pub fn spawn_fifo_future(&self, func: F) -> impl Future + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + let (future, job) = new(); + self.spawn_fifo(move || job.execute(func)); + future + } +} + +impl<'scope> Scope<'scope> { + pub fn spawn_future(&self, func: F) -> impl Future + where + F: FnOnce(&Self) -> T + Send + 'scope, + T: Send + 'scope, + { + let (future, job) = new(); + self.spawn(|scope| job.execute(move || func(scope))); + future + } +} + +impl<'scope> ScopeFifo<'scope> { + pub fn spawn_fifo_future(&self, func: F) -> impl Future + where + F: FnOnce(&Self) -> T + Send + 'scope, + T: Send + 'scope, + { + let (future, job) = new(); + self.spawn_fifo(|scope| job.execute(move || func(scope))); + future + } +} diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index c9694ee16..51bb216fa 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -80,6 +80,7 @@ mod log; mod private; mod broadcast; +mod future; mod job; mod join; mod latch; @@ -94,6 +95,7 @@ mod compile_fail; mod test; pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext}; +pub use self::future::{spawn_fifo_future, spawn_future}; pub use self::join::{join, join_context}; pub use self::registry::ThreadBuilder; pub use self::scope::{in_place_scope, scope, Scope}; From add7ff5885115507b741a0a42c4fe052e6af7c46 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Fri, 7 Apr 2023 13:20:08 -0700 Subject: [PATCH 2/2] Rename to async_spawn(F) -> AsyncSpawn --- rayon-core/src/future.rs | 27 ++++++++++++++------------- rayon-core/src/lib.rs | 2 +- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/rayon-core/src/future.rs b/rayon-core/src/future.rs index ef703eca5..5fa6dbc06 100644 --- a/rayon-core/src/future.rs +++ b/rayon-core/src/future.rs @@ -1,3 +1,4 @@ +#![allow(missing_debug_implementations)] #![allow(missing_docs)] use crate::job::JobResult; @@ -12,11 +13,11 @@ use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; -struct RayonFuture { +pub struct AsyncSpawn { state: Arc>>, } -struct RayonFutureJob { +struct AsyncSpawnJob { state: Arc>>, } @@ -25,20 +26,20 @@ struct State { waker: Option, } -fn new() -> (RayonFuture, RayonFutureJob) { +fn new() -> (AsyncSpawn, AsyncSpawnJob) { let state = Arc::new(Mutex::new(State { result: JobResult::None, waker: None, })); ( - RayonFuture { + AsyncSpawn { state: state.clone(), }, - RayonFutureJob { state }, + AsyncSpawnJob { state }, ) } -impl Future for RayonFuture { +impl Future for AsyncSpawn { type Output = T; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -57,7 +58,7 @@ impl Future for RayonFuture { } } -impl RayonFutureJob { +impl AsyncSpawnJob { fn execute(self, func: impl FnOnce() -> T) { let result = unwind::halt_unwinding(func); let mut guard = self.state.lock().expect("rayon future lock"); @@ -71,7 +72,7 @@ impl RayonFutureJob { } } -pub fn spawn_future(func: F) -> impl Future +pub fn async_spawn(func: F) -> AsyncSpawn where F: FnOnce() -> T + Send + 'static, T: Send + 'static, @@ -81,7 +82,7 @@ where future } -pub fn spawn_fifo_future(func: F) -> impl Future +pub fn async_spawn_fifo(func: F) -> AsyncSpawn where F: FnOnce() -> T + Send + 'static, T: Send + 'static, @@ -92,7 +93,7 @@ where } impl ThreadPool { - pub fn spawn_future(&self, func: F) -> impl Future + pub fn async_spawn(&self, func: F) -> AsyncSpawn where F: FnOnce() -> T + Send + 'static, T: Send + 'static, @@ -102,7 +103,7 @@ impl ThreadPool { future } - pub fn spawn_fifo_future(&self, func: F) -> impl Future + pub fn async_spawn_fifo(&self, func: F) -> AsyncSpawn where F: FnOnce() -> T + Send + 'static, T: Send + 'static, @@ -114,7 +115,7 @@ impl ThreadPool { } impl<'scope> Scope<'scope> { - pub fn spawn_future(&self, func: F) -> impl Future + pub fn async_spawn(&self, func: F) -> AsyncSpawn where F: FnOnce(&Self) -> T + Send + 'scope, T: Send + 'scope, @@ -126,7 +127,7 @@ impl<'scope> Scope<'scope> { } impl<'scope> ScopeFifo<'scope> { - pub fn spawn_fifo_future(&self, func: F) -> impl Future + pub fn async_spawn_fifo(&self, func: F) -> AsyncSpawn where F: FnOnce(&Self) -> T + Send + 'scope, T: Send + 'scope, diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 51bb216fa..bf42221d1 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -95,7 +95,7 @@ mod compile_fail; mod test; pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext}; -pub use self::future::{spawn_fifo_future, spawn_future}; +pub use self::future::{async_spawn, async_spawn_fifo}; pub use self::join::{join, join_context}; pub use self::registry::ThreadBuilder; pub use self::scope::{in_place_scope, scope, Scope};