diff --git a/futures-core/src/task/spawn.rs b/futures-core/src/task/spawn.rs index 895e81b598..eaa969096c 100644 --- a/futures-core/src/task/spawn.rs +++ b/futures-core/src/task/spawn.rs @@ -12,7 +12,7 @@ pub trait Spawn { /// represent relatively rare scenarios, such as the executor /// having been shut down so that it is no longer able to accept /// tasks. - fn spawn_obj(&mut self, future: FutureObj<'static, ()>) + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError>; /// Determines whether the executor is able to spawn new tasks. @@ -38,7 +38,7 @@ pub trait LocalSpawn { /// represent relatively rare scenarios, such as the executor /// having been shut down so that it is no longer able to accept /// tasks. - fn spawn_local_obj(&mut self, future: LocalFutureObj<'static, ()>) + fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError>; /// Determines whether the executor is able to spawn new tasks. @@ -87,8 +87,8 @@ impl SpawnError { } } -impl Spawn for &mut Sp { - fn spawn_obj(&mut self, future: FutureObj<'static, ()>) +impl Spawn for &Sp { + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { Sp::spawn_obj(self, future) } @@ -98,8 +98,8 @@ impl Spawn for &mut Sp { } } -impl LocalSpawn for &mut Sp { - fn spawn_local_obj(&mut self, future: LocalFutureObj<'static, ()>) +impl LocalSpawn for &Sp { + fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { Sp::spawn_local_obj(self, future) } @@ -115,7 +115,7 @@ mod if_alloc { use super::*; impl Spawn for Box { - fn spawn_obj(&mut self, future: FutureObj<'static, ()>) + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { (**self).spawn_obj(future) } @@ -126,7 +126,7 @@ mod if_alloc { } impl LocalSpawn for Box { - fn spawn_local_obj(&mut self, future: LocalFutureObj<'static, ()>) + fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { (**self).spawn_local_obj(future) } diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index 9985dca5b9..5142354b09 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -341,7 +341,7 @@ impl Iterator for BlockingStream { impl Spawn for LocalSpawner { fn spawn_obj( - &mut self, + &self, future: FutureObj<'static, ()>, ) -> Result<(), SpawnError> { if let Some(incoming) = self.incoming.upgrade() { @@ -363,7 +363,7 @@ impl Spawn for LocalSpawner { impl LocalSpawn for LocalSpawner { fn spawn_local_obj( - &mut self, + &self, future: LocalFutureObj<'static, ()>, ) -> Result<(), SpawnError> { if let Some(incoming) = self.incoming.upgrade() { diff --git a/futures-executor/src/thread_pool.rs b/futures-executor/src/thread_pool.rs index fded082f27..c06907ddc0 100644 --- a/futures-executor/src/thread_pool.rs +++ b/futures-executor/src/thread_pool.rs @@ -143,17 +143,7 @@ impl ThreadPool { impl Spawn for ThreadPool { fn spawn_obj( - &mut self, - future: FutureObj<'static, ()>, - ) -> Result<(), SpawnError> { - self.spawn_obj_ok(future); - Ok(()) - } -} - -impl Spawn for &ThreadPool { - fn spawn_obj( - &mut self, + &self, future: FutureObj<'static, ()>, ) -> Result<(), SpawnError> { self.spawn_obj_ok(future); diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index 44b0a76ff0..42fefcf172 100644 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -39,7 +39,7 @@ fn run_until_single_future() { #[test] fn run_until_ignores_spawned() { let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); assert_eq!(pool.run_until(lazy(|_| ())), ()); } @@ -48,7 +48,7 @@ fn run_until_ignores_spawned() { fn run_until_executes_spawned() { let (tx, rx) = oneshot::channel(); let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); spawn.spawn_local_obj(Box::pin(lazy(move |_| { tx.send(()).unwrap(); () @@ -69,8 +69,8 @@ fn run_executes_spawned() { let cnt2 = cnt.clone(); let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); - let mut spawn2 = pool.spawner(); + let spawn = pool.spawner(); + let spawn2 = pool.spawner(); spawn.spawn_local_obj(Box::pin(lazy(move |_| { spawn2.spawn_local_obj(Box::pin(lazy(move |_| { @@ -93,7 +93,7 @@ fn run_spawn_many() { let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); for _ in 0..ITER { let cnt = cnt.clone(); @@ -121,7 +121,7 @@ fn try_run_one_executes_one_ready() { let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); for _ in 0..ITER { spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); @@ -150,7 +150,7 @@ fn try_run_one_returns_on_no_progress() { let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); let waker: Rc>> = Rc::new(Cell::new(None)); { @@ -189,7 +189,7 @@ fn run_until_stalled_returns_if_empty() { #[test] fn run_until_stalled_returns_multiple_times() { let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); let cnt = Rc::new(Cell::new(0)); let cnt1 = cnt.clone(); @@ -211,7 +211,7 @@ fn run_until_stalled_executes_all_ready() { let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); for i in 0..ITER { for _ in 0..PER_ITER { @@ -236,7 +236,7 @@ fn run_until_stalled_executes_all_ready() { #[should_panic] fn nesting_run() { let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); spawn.spawn_obj(Box::pin(lazy(|_| { let mut pool = LocalPool::new(); @@ -250,7 +250,7 @@ fn nesting_run() { #[should_panic] fn nesting_run_run_until_stalled() { let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); spawn.spawn_obj(Box::pin(lazy(|_| { let mut pool = LocalPool::new(); @@ -297,7 +297,7 @@ fn tasks_are_scheduled_fairly() { } let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), diff --git a/futures-test/src/task/noop_spawner.rs b/futures-test/src/task/noop_spawner.rs index 947fd119e6..927a2e5cb0 100644 --- a/futures-test/src/task/noop_spawner.rs +++ b/futures-test/src/task/noop_spawner.rs @@ -27,7 +27,7 @@ impl NoopSpawner { impl Spawn for NoopSpawner { fn spawn_obj( - &mut self, + &self, _future: FutureObj<'static, ()>, ) -> Result<(), SpawnError> { Ok(()) diff --git a/futures-test/src/task/panic_spawner.rs b/futures-test/src/task/panic_spawner.rs index 4206d44d9e..5fac9e9fd4 100644 --- a/futures-test/src/task/panic_spawner.rs +++ b/futures-test/src/task/panic_spawner.rs @@ -28,7 +28,7 @@ impl PanicSpawner { impl Spawn for PanicSpawner { fn spawn_obj( - &mut self, + &self, _future: FutureObj<'static, ()>, ) -> Result<(), SpawnError> { panic!("should not spawn") diff --git a/futures-test/src/task/record_spawner.rs b/futures-test/src/task/record_spawner.rs index 3b13d00222..b93bd472db 100644 --- a/futures-test/src/task/record_spawner.rs +++ b/futures-test/src/task/record_spawner.rs @@ -1,5 +1,6 @@ use futures_core::future::FutureObj; use futures_core::task::{Spawn, SpawnError}; +use std::cell::{Ref, RefCell}; /// An implementation of [`Spawn`](futures_core::task::Spawn) that records /// any [`Future`](futures_core::future::Future)s spawned on it. @@ -14,37 +15,29 @@ use futures_core::task::{Spawn, SpawnError}; /// recorder.spawn(async { }).unwrap(); /// assert_eq!(recorder.spawned().len(), 1); /// ``` -#[derive(Debug)] +#[derive(Debug, Default)] pub struct RecordSpawner { - spawned: Vec>, + spawned: RefCell>>, } impl RecordSpawner { /// Create a new instance pub fn new() -> Self { - Self { - spawned: Vec::new(), - } + Default::default() } /// Inspect any futures that were spawned onto this [`Spawn`]. - pub fn spawned(&self) -> &[FutureObj<'static, ()>] { - &self.spawned + pub fn spawned(&self) -> Ref<'_, Vec>> { + self.spawned.borrow() } } impl Spawn for RecordSpawner { fn spawn_obj( - &mut self, + &self, future: FutureObj<'static, ()>, ) -> Result<(), SpawnError> { - self.spawned.push(future); + self.spawned.borrow_mut().push(future); Ok(()) } } - -impl Default for RecordSpawner { - fn default() -> Self { - Self::new() - } -} diff --git a/futures-util/src/compat/executor.rs b/futures-util/src/compat/executor.rs index 1fe080055e..43b10c2553 100644 --- a/futures-util/src/compat/executor.rs +++ b/futures-util/src/compat/executor.rs @@ -68,7 +68,7 @@ where Ex: Executor01 + Clone + Send + 'static, { fn spawn_obj( - &mut self, + &self, future: FutureObj<'static, ()>, ) -> Result<(), SpawnError03> { let future = future.unit_error().compat(); diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 6c8d8b1065..8cc15843a6 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -7,7 +7,7 @@ use crate::task::{AtomicWaker}; use futures_core::future::{Future, FutureObj, LocalFutureObj}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll, Spawn, LocalSpawn, SpawnError}; -use core::cell::UnsafeCell; +use core::cell::{Cell, UnsafeCell}; use core::fmt::{self, Debug}; use core::iter::FromIterator; use core::marker::PhantomData; @@ -61,8 +61,8 @@ const TERMINATED_SENTINEL_LENGTH: usize = usize::max_value(); #[must_use = "streams do nothing unless polled"] pub struct FuturesUnordered { ready_to_run_queue: Arc>, - len: usize, - head_all: *const Task, + len: Cell, + head_all: Cell<*const Task>, } unsafe impl Send for FuturesUnordered {} @@ -70,7 +70,7 @@ unsafe impl Sync for FuturesUnordered {} impl Unpin for FuturesUnordered {} impl Spawn for FuturesUnordered> { - fn spawn_obj(&mut self, future_obj: FutureObj<'static, ()>) + fn spawn_obj(&self, future_obj: FutureObj<'static, ()>) -> Result<(), SpawnError> { self.push(future_obj); @@ -79,7 +79,7 @@ impl Spawn for FuturesUnordered> { } impl LocalSpawn for FuturesUnordered> { - fn spawn_local_obj(&mut self, future_obj: LocalFutureObj<'static, ()>) + fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { self.push(future_obj); @@ -134,8 +134,8 @@ impl FuturesUnordered { }); FuturesUnordered { - len: 0, - head_all: ptr::null_mut(), + len: 0.into(), + head_all: Cell::from(ptr::null()), ready_to_run_queue, } } @@ -152,12 +152,14 @@ impl FuturesUnordered { /// /// This represents the total number of in-flight futures. pub fn len(&self) -> usize { - if self.len == TERMINATED_SENTINEL_LENGTH { 0 } else { self.len } + let len = self.len.get(); + if len == TERMINATED_SENTINEL_LENGTH { 0 } else { len } } /// Returns `true` if the set contains no futures. pub fn is_empty(&self) -> bool { - self.len == 0 || self.len == TERMINATED_SENTINEL_LENGTH + let len = self.len.get(); + len == 0 || len == TERMINATED_SENTINEL_LENGTH } /// Push a future into the set. @@ -166,7 +168,7 @@ impl FuturesUnordered { /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must /// ensure that [`FuturesUnordered::poll_next`](Stream::poll_next) is called /// in order to receive wake-up notifications for the given future. - pub fn push(&mut self, future: Fut) { + pub fn push(&self, future: Fut) { let task = Arc::new(Task { future: UnsafeCell::new(Some(future)), next_all: UnsafeCell::new(ptr::null_mut()), @@ -178,8 +180,8 @@ impl FuturesUnordered { // If we've previously marked ourselves as terminated we need to reset // len to 0 to track it correctly - if self.len == TERMINATED_SENTINEL_LENGTH { - self.len = 0; + if self.len.get() == TERMINATED_SENTINEL_LENGTH { + self.len.set(0); } // Right now our task has a strong reference count of 1. We transfer @@ -202,7 +204,7 @@ impl FuturesUnordered { /// Returns an iterator that allows inspecting each future in the set. fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { IterPinRef { - task: self.head_all, + task: self.head_all.get(), len: self.len(), _marker: PhantomData, } @@ -216,7 +218,7 @@ impl FuturesUnordered { /// Returns an iterator that allows modifying each future in the set. pub fn iter_pin_mut(self: Pin<&mut Self>) -> IterPinMut<'_, Fut> { IterPinMut { - task: self.head_all, + task: self.head_all.get(), len: self.len(), _marker: PhantomData } @@ -263,17 +265,18 @@ impl FuturesUnordered { } /// Insert a new task into the internal linked list. - fn link(&mut self, task: Arc>) -> *const Task { + fn link(&self, task: Arc>) -> *const Task { let ptr = Arc::into_raw(task); unsafe { - *(*ptr).next_all.get() = self.head_all; - if !self.head_all.is_null() { - *(*self.head_all).prev_all.get() = ptr; + *(*ptr).next_all.get() = self.head_all.get(); + if !self.head_all.get().is_null() { + *(*self.head_all.get()).prev_all.get() = ptr; } } - self.head_all = ptr; - self.len += 1; + self.head_all.set(ptr); + let old_len = self.len.get(); + self.len.set(old_len + 1); ptr } @@ -295,9 +298,10 @@ impl FuturesUnordered { if !prev.is_null() { *(*prev).next_all.get() = next; } else { - self.head_all = next; + self.head_all.set(next); } - self.len -= 1; + let old_len = self.len.get(); + self.len.set(old_len - 1); task } } @@ -319,7 +323,7 @@ impl Stream for FuturesUnordered { if self.is_empty() { // We can only consider ourselves terminated once we // have yielded a `None` - self.len = TERMINATED_SENTINEL_LENGTH; + self.len.set(TERMINATED_SENTINEL_LENGTH); return Poll::Ready(None); } else { return Poll::Pending; @@ -461,8 +465,8 @@ impl Drop for FuturesUnordered { // wakers flying around which contain `Task` references // inside them. We'll let those naturally get deallocated. unsafe { - while !self.head_all.is_null() { - let head = self.head_all; + while !self.head_all.get().is_null() { + let head = self.head_all.get(); let task = self.unlink(head); self.release_task(task); } @@ -489,12 +493,12 @@ impl FromIterator for FuturesUnordered { I: IntoIterator, { let acc = FuturesUnordered::new(); - iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc }) + iter.into_iter().fold(acc, |acc, item| { acc.push(item); acc }) } } impl FusedStream for FuturesUnordered { fn is_terminated(&self) -> bool { - self.len == TERMINATED_SENTINEL_LENGTH + self.len.get() == TERMINATED_SENTINEL_LENGTH } } diff --git a/futures/tests/ready_queue.rs b/futures/tests/ready_queue.rs index 8005800514..1fa7d79064 100644 --- a/futures/tests/ready_queue.rs +++ b/futures/tests/ready_queue.rs @@ -70,7 +70,7 @@ fn resolving_errors() { #[test] fn dropping_ready_queue() { block_on(future::lazy(move |_| { - let mut queue = FuturesUnordered::new(); + let queue = FuturesUnordered::new(); let (mut tx1, rx1) = oneshot::channel::<()>(); let (mut tx2, rx2) = oneshot::channel::<()>(); let (mut tx3, rx3) = oneshot::channel::<()>();