/
mod.rs
174 lines (161 loc) · 5.77 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#[cfg(rayon_unstable)]
use future::{self, Future, RayonFuture};
#[allow(unused_imports)]
use latch::{Latch, SpinLatch};
use job::*;
use registry::Registry;
use std::any::Any;
use std::mem;
use std::sync::Arc;
use unwind;
/// Fires off a task into the Rayon threadpool in the "static" or
/// "global" scope. Just like a standard thread, this task is not
/// tied to the current stack frame, and hence it cannot hold any
/// references other than those with `'static` lifetime. If you want
/// to spawn a task that references stack data, use [the `scope()`
/// function][scope] to create a scope.
///
/// [scope]: fn.scope.html
///
/// Since tasks spawned with this function cannot hold references into
/// the enclosing stack frame, you almost certainly want to use a
/// `move` closure as their argument (otherwise, the closure will
/// typically hold references to any variables from the enclosing
/// function that you happen to use).
///
/// This API assumes that the closure is executed purely for its
/// side-effects (i.e., it might send messages, modify data protected
/// by a mutex, or some such thing). If you want to compute a result,
/// consider `spawn_future()`.
///
/// # Panic handling
///
/// If this closure should panic, the resulting panic will be
/// propagated to the panic handler registered in the `Configuration`,
/// if any. See [`Configuration::panic_handler()`][ph] for more
/// details.
///
/// [ph]: struct.Configuration.html#method.panic_handler
///
/// # Examples
///
/// This code creates a Rayon task that increments a global counter.
///
/// ```rust
/// # use rayon_core as rayon;
/// use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
///
/// static GLOBAL_COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
///
/// rayon::spawn(move || {
/// GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst);
/// });
/// ```
pub fn spawn<F>(func: F)
where F: FnOnce() + Send + 'static
{
// We assert that current registry has not terminated.
unsafe { spawn_in(func, &Registry::current()) }
}
/// Spawn an asynchronous job in `registry.`
///
/// Unsafe because `registry` must not yet have terminated.
///
/// Not a public API, but used elsewhere in Rayon.
pub unsafe fn spawn_in<F>(func: F, registry: &Arc<Registry>)
where F: FnOnce() + Send + 'static
{
// Ensure that registry cannot terminate until this job has
// executed. This ref is decremented at the (*) below.
registry.increment_terminate_count();
let async_job = Box::new(HeapJob::new({
let registry = registry.clone();
move || {
match unwind::halt_unwinding(func) {
Ok(()) => {
}
Err(err) => {
registry.handle_panic(err);
}
}
registry.terminate(); // (*) permit registry to terminate now
}
}));
// We assert that this does not hold any references (we know
// this because of the `'static` bound in the inferface);
// moreover, we assert that the code below is not supposed to
// be able to panic, and hence the data won't leak but will be
// enqueued into some deque for later execution.
let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic
let job_ref = HeapJob::as_job_ref(async_job);
registry.inject_or_push(job_ref);
mem::forget(abort_guard);
}
/// Spawns a future in the static scope, scheduling it to execute on
/// Rayon's threadpool. Returns a new future that can be used to poll
/// for the result. Since this future is executing in the static scope,
/// it cannot hold references to things in the enclosing stack frame;
/// if you would like to hold such references, use [the `scope()`
/// function][scope] to create a scope.
///
/// [scope]: fn.scope.html
///
/// # Panic handling
///
/// If this future should panic, that panic will be propagated when
/// `poll()` is invoked on the return value.
#[cfg(rayon_unstable)]
pub fn spawn_future<F>(future: F) -> RayonFuture<F::Item, F::Error>
where F: Future + Send + 'static
{
/// We assert that the current registry cannot yet have terminated.
unsafe { spawn_future_in(future, Registry::current()) }
}
/// Internal helper function.
///
/// Unsafe because caller must guarantee that `registry` has not yet terminated.
#[cfg(rayon_unstable)]
pub unsafe fn spawn_future_in<F>(future: F, registry: Arc<Registry>) -> RayonFuture<F::Item, F::Error>
where F: Future + Send + 'static
{
let scope = StaticFutureScope::new(registry.clone());
future::new_rayon_future(future, scope)
}
#[cfg(rayon_unstable)]
struct StaticFutureScope {
registry: Arc<Registry>
}
#[cfg(rayon_unstable)]
impl StaticFutureScope {
/// Caller asserts that the registry has not yet terminated.
unsafe fn new(registry: Arc<Registry>) -> Self {
registry.increment_terminate_count();
StaticFutureScope { registry: registry }
}
}
/// We assert that:
///
/// (a) the scope valid remains valid until a completion method
/// is called. In this case, "remains valid" means that the
/// registry is not terminated. This is true because we
/// acquire a "termination count" in `StaticFutureScope::new()`
/// which is not released until `future_panicked()` or
/// `future_completed()` is invoked.
/// (b) the lifetime `'static` will not end until a completion
/// method is called. This is true because `'static` doesn't
/// end until the end of the program.
#[cfg(rayon_unstable)]
unsafe impl future::FutureScope<'static> for StaticFutureScope {
fn registry(&self) -> Arc<Registry> {
self.registry.clone()
}
fn future_panicked(self, err: Box<Any + Send>) {
self.registry.handle_panic(err);
self.registry.terminate();
}
fn future_completed(self) {
self.registry.terminate();
}
}
#[cfg(test)]
mod test;