Skip to content

Commit

Permalink
Stabilize a number of APIs:
Browse files Browse the repository at this point in the history
- `rayon_core::ThreadPool::join`
- `rayon_core::ThreadPool::scope`
- `rayon_core::ThreadPool::spawn`
- `rayon_core::spawn` -- runs async task in current (or global) thread-pool

The only one that enables something fundamentally new that wasn't
available in a stable API is `rayon_core::spawn`. The `ThreadPool` APIs
are convenience wrappers around other APIs.

The following APIs remain **unstable**:

- `ThreadPool::global` -- I don't know that we want to expose th `Arc`
  here, although it's no real commitment
- everything related to futures -- that is all planned to change
  • Loading branch information
nikomatsakis committed Jun 14, 2017
1 parent f500505 commit 1c8a427
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 15 deletions.
7 changes: 0 additions & 7 deletions examples/cpu_monitor.rs
Expand Up @@ -77,16 +77,9 @@ fn task_stall_root(args: &Args) {
rayon::join(|| task(args), || wait_for_user());
}

#[cfg(rayon_unstable)]
fn task_stall_scope(args: &Args) {
rayon::scope(|scope| {
scope.spawn(move |_| task(args));
scope.spawn(move |_| wait_for_user());
});
}

#[cfg(not(rayon_unstable))]
fn task_stall_scope(_args: &Args) {
println!("try `RUSTFLAGS='--cfg rayon_unstable' cargo run`");
process::exit(1);
}
2 changes: 0 additions & 2 deletions rayon-core/src/lib.rs
Expand Up @@ -55,7 +55,6 @@ mod registry;
mod future;
mod scope;
mod sleep;
#[cfg(rayon_unstable)]
mod spawn;
mod test;
mod thread_pool;
Expand All @@ -67,7 +66,6 @@ pub use thread_pool::current_thread_index;
pub use thread_pool::current_thread_has_pending_tasks;
pub use join::join;
pub use scope::{scope, Scope};
#[cfg(rayon_unstable)]
pub use spawn::spawn;
#[cfg(rayon_unstable)]
pub use spawn::spawn_future;
Expand Down
6 changes: 6 additions & 0 deletions rayon-core/src/spawn/mod.rs
@@ -1,3 +1,4 @@
#[cfg(rayon_unstable)]
use future::{self, Future, RayonFuture};
#[allow(unused_imports)]
use latch::{Latch, SpinLatch};
Expand Down Expand Up @@ -108,6 +109,7 @@ pub unsafe fn spawn_in<F>(func: F, registry: &Arc<Registry>)
///
/// If this future should panic, that panic will be propagated when
/// `poll()` is invoked on the return value.
#[cfg(rayon_unstable)]
pub fn spawn_future<F>(future: F) -> RayonFuture<F::Item, F::Error>
where F: Future + Send + 'static
{
Expand All @@ -118,6 +120,7 @@ pub fn spawn_future<F>(future: F) -> RayonFuture<F::Item, F::Error>
/// Internal helper function.
///
/// Unsafe because caller must guarantee that `registry` has not yet terminated.
#[cfg(rayon_unstable)]
pub unsafe fn spawn_future_in<F>(future: F, registry: Arc<Registry>) -> RayonFuture<F::Item, F::Error>
where F: Future + Send + 'static
{
Expand All @@ -126,10 +129,12 @@ pub unsafe fn spawn_future_in<F>(future: F, registry: Arc<Registry>) -> RayonFut
future::new_rayon_future(future, scope)
}

#[cfg(rayon_unstable)]
struct StaticFutureScope {
registry: Arc<Registry>
}

#[cfg(rayon_unstable)]
impl StaticFutureScope {
/// Caller asserts that the registry has not yet terminated.
unsafe fn new(registry: Arc<Registry>) -> Self {
Expand All @@ -149,6 +154,7 @@ impl StaticFutureScope {
/// (b) the lifetime `'static` will not end until a completion
/// method is called. This is true because `'static` doesn't
/// end until the end of the program.
#[cfg(rayon_unstable)]
unsafe impl future::FutureScope<'static> for StaticFutureScope {
fn registry(&self) -> Arc<Registry> {
self.registry.clone()
Expand Down
8 changes: 7 additions & 1 deletion rayon-core/src/spawn/test.rs
@@ -1,3 +1,4 @@
#[cfg(rayon_unstable)]
use futures::{lazy, Future};

use scope;
Expand All @@ -6,7 +7,9 @@ use std::sync::{Arc, Mutex};
use std::sync::mpsc::channel;

use {Configuration, ThreadPool};
use super::{spawn, spawn_future};
use super::spawn;
#[cfg(rayon_unstable)]
use super::spawn_future;

#[test]
fn spawn_then_join_in_worker() {
Expand Down Expand Up @@ -50,6 +53,7 @@ fn panic_fwd() {
}

#[test]
#[cfg(rayon_unstable)]
fn async_future_map() {
let data = Arc::new(Mutex::new(format!("Hello, ")));

Expand All @@ -70,6 +74,7 @@ fn async_future_map() {

#[test]
#[should_panic(expected = "Hello, world!")]
#[cfg(rayon_unstable)]
fn async_future_panic_prop() {
let future = spawn_future(lazy(move || Ok::<(), ()>(argh())));
let _ = future.rayon_wait(); // should panic, not return a value
Expand All @@ -82,6 +87,7 @@ fn async_future_panic_prop() {
}

#[test]
#[cfg(rayon_unstable)]
fn async_future_scope_interact() {
let future = spawn_future(lazy(move || Ok::<usize, ()>(22)));

Expand Down
4 changes: 0 additions & 4 deletions rayon-core/src/thread_pool/mod.rs
Expand Up @@ -7,7 +7,6 @@ use log::Event::*;
use job::StackJob;
use join;
use {scope, Scope};
#[cfg(rayon_unstable)]
use spawn;
use std::sync::Arc;
use std::error::Error;
Expand Down Expand Up @@ -208,7 +207,6 @@ impl ThreadPool {
/// Execute `oper_a` and `oper_b` in the thread-pool and return
/// the results. Equivalent to `self.install(|| join(oper_a,
/// oper_b))`.
#[cfg(rayon_unstable)]
pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
where A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
Expand All @@ -224,7 +222,6 @@ impl ThreadPool {
/// See also: [the `scope()` function][scope].
///
/// [scope]: fn.scope.html
#[cfg(rayon_unstable)]
pub fn scope<'scope, OP, R>(&self, op: OP) -> R
where OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send, R: Send
{
Expand All @@ -239,7 +236,6 @@ impl ThreadPool {
/// See also: [the `spawn()` function defined on scopes][spawn].
///
/// [spawn]: struct.Scope.html#method.spawn
#[cfg(rayon_unstable)]
pub fn spawn<OP>(&self, op: OP)
where OP: FnOnce() + Send + 'static
{
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Expand Up @@ -39,7 +39,6 @@ pub use rayon_core::initialize;
pub use rayon_core::ThreadPool;
pub use rayon_core::join;
pub use rayon_core::{scope, Scope};
#[cfg(rayon_unstable)]
pub use rayon_core::spawn;
#[cfg(rayon_unstable)]
pub use rayon_core::spawn_future;
Expand Down

0 comments on commit 1c8a427

Please sign in to comment.