diff --git a/rayon-core/src/future.rs b/rayon-core/src/future.rs new file mode 100644 index 000000000..5fa6dbc06 --- /dev/null +++ b/rayon-core/src/future.rs @@ -0,0 +1,139 @@ +#![allow(missing_debug_implementations)] +#![allow(missing_docs)] + +use crate::job::JobResult; +use crate::unwind; +use crate::ThreadPool; +use crate::{spawn, spawn_fifo}; +use crate::{Scope, ScopeFifo}; + +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; + +pub struct AsyncSpawn { + state: Arc>>, +} + +struct AsyncSpawnJob { + state: Arc>>, +} + +struct State { + result: JobResult, + waker: Option, +} + +fn new() -> (AsyncSpawn, AsyncSpawnJob) { + let state = Arc::new(Mutex::new(State { + result: JobResult::None, + waker: None, + })); + ( + AsyncSpawn { + state: state.clone(), + }, + AsyncSpawnJob { state }, + ) +} + +impl Future for AsyncSpawn { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut guard = self.state.lock().expect("rayon future lock"); + match mem::replace(&mut guard.result, JobResult::None) { + JobResult::None => { + guard.waker = Some(cx.waker().clone()); + Poll::Pending + } + JobResult::Ok(x) => Poll::Ready(x), + JobResult::Panic(p) => { + drop(guard); // don't poison the lock + unwind::resume_unwinding(p); + } + } + } +} + +impl AsyncSpawnJob { + fn execute(self, func: impl FnOnce() -> T) { + let result = unwind::halt_unwinding(func); + let mut guard = self.state.lock().expect("rayon future lock"); + guard.result = match result { + Ok(x) => JobResult::Ok(x), + Err(p) => JobResult::Panic(p), + }; + if let Some(waker) = guard.waker.take() { + waker.wake(); + } + } +} + +pub fn async_spawn(func: F) -> AsyncSpawn +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + let (future, job) = new(); + spawn(move || job.execute(func)); + future +} + +pub fn async_spawn_fifo(func: F) -> AsyncSpawn +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + let (future, job) = new(); + spawn_fifo(move || job.execute(func)); + future +} + +impl ThreadPool { + pub fn async_spawn(&self, func: F) -> AsyncSpawn + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + let (future, job) = new(); + self.spawn(move || job.execute(func)); + future + } + + pub fn async_spawn_fifo(&self, func: F) -> AsyncSpawn + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + let (future, job) = new(); + self.spawn_fifo(move || job.execute(func)); + future + } +} + +impl<'scope> Scope<'scope> { + pub fn async_spawn(&self, func: F) -> AsyncSpawn + where + F: FnOnce(&Self) -> T + Send + 'scope, + T: Send + 'scope, + { + let (future, job) = new(); + self.spawn(|scope| job.execute(move || func(scope))); + future + } +} + +impl<'scope> ScopeFifo<'scope> { + pub fn async_spawn_fifo(&self, func: F) -> AsyncSpawn + where + F: FnOnce(&Self) -> T + Send + 'scope, + T: Send + 'scope, + { + let (future, job) = new(); + self.spawn_fifo(|scope| job.execute(move || func(scope))); + future + } +} diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index c9694ee16..bf42221d1 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -80,6 +80,7 @@ mod log; mod private; mod broadcast; +mod future; mod job; mod join; mod latch; @@ -94,6 +95,7 @@ mod compile_fail; mod test; pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext}; +pub use self::future::{async_spawn, async_spawn_fifo}; pub use self::join::{join, join_context}; pub use self::registry::ThreadBuilder; pub use self::scope::{in_place_scope, scope, Scope};