diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index 84795dbfc0..fea2f143e6 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -333,7 +333,7 @@ impl Iterator for BlockingStream { impl Spawn for LocalSpawner { fn spawn_obj( - &mut self, + &self, future: FutureObj<'static, ()>, ) -> Result<(), SpawnError> { if let Some(incoming) = self.incoming.upgrade() { @@ -355,7 +355,7 @@ impl Spawn for LocalSpawner { impl LocalSpawn for LocalSpawner { fn spawn_local_obj( - &mut self, + &self, future: LocalFutureObj<'static, ()>, ) -> Result<(), SpawnError> { if let Some(incoming) = self.incoming.upgrade() { diff --git a/futures-executor/src/thread_pool.rs b/futures-executor/src/thread_pool.rs index f0192e86ec..1f2a567f84 100644 --- a/futures-executor/src/thread_pool.rs +++ b/futures-executor/src/thread_pool.rs @@ -131,17 +131,7 @@ impl ThreadPool { impl Spawn for ThreadPool { fn spawn_obj( - &mut self, - future: FutureObj<'static, ()>, - ) -> Result<(), SpawnError> { - self.spawn_obj_ok(future); - Ok(()) - } -} - -impl Spawn for &ThreadPool { - fn spawn_obj( - &mut self, + &self, future: FutureObj<'static, ()>, ) -> Result<(), SpawnError> { self.spawn_obj_ok(future); diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index 9663d03a66..b6a5678388 100644 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -39,7 +39,7 @@ fn run_until_single_future() { #[test] fn run_until_ignores_spawned() { let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); assert_eq!(pool.run_until(lazy(|_| ())), ()); } @@ -48,7 +48,7 @@ fn run_until_ignores_spawned() { fn run_until_executes_spawned() { let (tx, rx) = oneshot::channel(); let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); spawn.spawn_local_obj(Box::pin(lazy(move |_| { tx.send(()).unwrap(); () @@ -69,8 +69,8 @@ fn run_executes_spawned() { let cnt2 = cnt.clone(); let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); - let mut spawn2 = pool.spawner(); + let spawn = pool.spawner(); + let spawn2 = pool.spawner(); spawn.spawn_local_obj(Box::pin(lazy(move |_| { spawn2.spawn_local_obj(Box::pin(lazy(move |_| { @@ -93,7 +93,7 @@ fn run_spawn_many() { let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); for _ in 0..ITER { let cnt = cnt.clone(); @@ -121,7 +121,7 @@ fn try_run_one_executes_one_ready() { let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); for _ in 0..ITER { spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); @@ -150,7 +150,7 @@ fn try_run_one_returns_on_no_progress() { let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); let waker: Rc>> = Rc::new(Cell::new(None)); { @@ -182,10 +182,10 @@ fn try_run_one_returns_on_no_progress() { #[test] fn try_run_one_runs_sub_futures() { let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); let cnt = Rc::new(Cell::new(0)); - let mut inner_spawner = spawn.clone(); + let inner_spawner = spawn.clone(); let cnt1 = cnt.clone(); spawn.spawn_local_obj(Box::pin(poll_fn(move |_| { cnt1.set(cnt1.get() + 1); @@ -212,7 +212,7 @@ fn run_until_stalled_returns_if_empty() { #[test] fn run_until_stalled_returns_multiple_times() { let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); let cnt = Rc::new(Cell::new(0)); let cnt1 = cnt.clone(); @@ -229,10 +229,10 @@ fn run_until_stalled_returns_multiple_times() { #[test] fn run_until_stalled_runs_spawned_sub_futures() { let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); let cnt = Rc::new(Cell::new(0)); - let mut inner_spawner = spawn.clone(); + let inner_spawner = spawn.clone(); let cnt1 = cnt.clone(); spawn.spawn_local_obj(Box::pin(poll_fn(move |_| { cnt1.set(cnt1.get() + 1); @@ -257,7 +257,7 @@ fn run_until_stalled_executes_all_ready() { let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); for i in 0..ITER { for _ in 0..PER_ITER { @@ -282,7 +282,7 @@ fn run_until_stalled_executes_all_ready() { #[should_panic] fn nesting_run() { let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); spawn.spawn_obj(Box::pin(lazy(|_| { let mut pool = LocalPool::new(); @@ -296,7 +296,7 @@ fn nesting_run() { #[should_panic] fn nesting_run_run_until_stalled() { let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); spawn.spawn_obj(Box::pin(lazy(|_| { let mut pool = LocalPool::new(); @@ -343,7 +343,7 @@ fn tasks_are_scheduled_fairly() { } let mut pool = LocalPool::new(); - let mut spawn = pool.spawner(); + let spawn = pool.spawner(); spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), diff --git a/futures-task/src/spawn.rs b/futures-task/src/spawn.rs index 9616e190ae..d59cc454a9 100644 --- a/futures-task/src/spawn.rs +++ b/futures-task/src/spawn.rs @@ -12,7 +12,7 @@ pub trait Spawn { /// represent relatively rare scenarios, such as the executor /// having been shut down so that it is no longer able to accept /// tasks. - fn spawn_obj(&mut self, future: FutureObj<'static, ()>) -> Result<(), SpawnError>; + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError>; /// Determines whether the executor is able to spawn new tasks. /// @@ -37,7 +37,7 @@ pub trait LocalSpawn { /// represent relatively rare scenarios, such as the executor /// having been shut down so that it is no longer able to accept /// tasks. - fn spawn_local_obj(&mut self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError>; + fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError>; /// Determines whether the executor is able to spawn new tasks. /// @@ -84,7 +84,7 @@ impl SpawnError { } impl Spawn for &mut Sp { - fn spawn_obj(&mut self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { Sp::spawn_obj(self, future) } @@ -93,8 +93,8 @@ impl Spawn for &mut Sp { } } -impl LocalSpawn for &mut Sp { - fn spawn_local_obj(&mut self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { +impl LocalSpawn for &Sp { + fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { Sp::spawn_local_obj(self, future) } @@ -109,7 +109,7 @@ mod if_alloc { use alloc::boxed::Box; impl Spawn for Box { - fn spawn_obj(&mut self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { (**self).spawn_obj(future) } @@ -119,7 +119,7 @@ mod if_alloc { } impl LocalSpawn for Box { - fn spawn_local_obj(&mut self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { + fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { (**self).spawn_local_obj(future) } diff --git a/futures-test/src/task/noop_spawner.rs b/futures-test/src/task/noop_spawner.rs index 517a15b17d..039a27ecec 100644 --- a/futures-test/src/task/noop_spawner.rs +++ b/futures-test/src/task/noop_spawner.rs @@ -26,7 +26,7 @@ impl NoopSpawner { impl Spawn for NoopSpawner { fn spawn_obj( - &mut self, + &self, _future: FutureObj<'static, ()>, ) -> Result<(), SpawnError> { Ok(()) diff --git a/futures-test/src/task/panic_spawner.rs b/futures-test/src/task/panic_spawner.rs index 4766ee7171..19978c7463 100644 --- a/futures-test/src/task/panic_spawner.rs +++ b/futures-test/src/task/panic_spawner.rs @@ -27,7 +27,7 @@ impl PanicSpawner { impl Spawn for PanicSpawner { fn spawn_obj( - &mut self, + &self, _future: FutureObj<'static, ()>, ) -> Result<(), SpawnError> { panic!("should not spawn") diff --git a/futures-test/src/task/record_spawner.rs b/futures-test/src/task/record_spawner.rs index 3db983d5b8..ccc2dbc406 100644 --- a/futures-test/src/task/record_spawner.rs +++ b/futures-test/src/task/record_spawner.rs @@ -1,4 +1,5 @@ use futures_task::{Spawn, SpawnError, FutureObj}; +use std::cell::{Ref, RefCell}; /// An implementation of [`Spawn`](futures_task::Spawn) that records /// any [`Future`](futures_core::future::Future)s spawned on it. @@ -13,37 +14,29 @@ use futures_task::{Spawn, SpawnError, FutureObj}; /// recorder.spawn(async { }).unwrap(); /// assert_eq!(recorder.spawned().len(), 1); /// ``` -#[derive(Debug)] +#[derive(Debug, Default)] pub struct RecordSpawner { - spawned: Vec>, + spawned: RefCell>>, } impl RecordSpawner { /// Create a new instance pub fn new() -> Self { - Self { - spawned: Vec::new(), - } + Default::default() } /// Inspect any futures that were spawned onto this [`Spawn`]. - pub fn spawned(&self) -> &[FutureObj<'static, ()>] { - &self.spawned + pub fn spawned(&self) -> Ref<'_, Vec>> { + self.spawned.borrow() } } impl Spawn for RecordSpawner { fn spawn_obj( - &mut self, + &self, future: FutureObj<'static, ()>, ) -> Result<(), SpawnError> { - self.spawned.push(future); + self.spawned.borrow_mut().push(future); Ok(()) } } - -impl Default for RecordSpawner { - fn default() -> Self { - Self::new() - } -} diff --git a/futures-util/src/compat/executor.rs b/futures-util/src/compat/executor.rs index 2df900c45f..230bff51ee 100644 --- a/futures-util/src/compat/executor.rs +++ b/futures-util/src/compat/executor.rs @@ -67,7 +67,7 @@ where Ex: Executor01 + Clone + Send + 'static, { fn spawn_obj( - &mut self, + &self, future: FutureObj<'static, ()>, ) -> Result<(), SpawnError03> { let future = future.unit_error().compat(); diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index ff4efa8321..8dc710cd34 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -8,7 +8,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_task::{FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError}; use crate::task::AtomicWaker; -use core::cell::UnsafeCell; +use core::cell::{Cell, UnsafeCell}; use core::fmt::{self, Debug}; use core::iter::FromIterator; use core::marker::PhantomData; @@ -62,8 +62,8 @@ const TERMINATED_SENTINEL_LENGTH: usize = usize::max_value(); #[must_use = "streams do nothing unless polled"] pub struct FuturesUnordered { ready_to_run_queue: Arc>, - len: usize, - head_all: *const Task, + len: Cell, + head_all: Cell<*const Task>, } unsafe impl Send for FuturesUnordered {} @@ -71,7 +71,7 @@ unsafe impl Sync for FuturesUnordered {} impl Unpin for FuturesUnordered {} impl Spawn for FuturesUnordered> { - fn spawn_obj(&mut self, future_obj: FutureObj<'static, ()>) + fn spawn_obj(&self, future_obj: FutureObj<'static, ()>) -> Result<(), SpawnError> { self.push(future_obj); @@ -80,7 +80,7 @@ impl Spawn for FuturesUnordered> { } impl LocalSpawn for FuturesUnordered> { - fn spawn_local_obj(&mut self, future_obj: LocalFutureObj<'static, ()>) + fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { self.push(future_obj); @@ -135,8 +135,8 @@ impl FuturesUnordered { }); FuturesUnordered { - len: 0, - head_all: ptr::null_mut(), + len: 0.into(), + head_all: Cell::from(ptr::null()), ready_to_run_queue, } } @@ -153,12 +153,14 @@ impl FuturesUnordered { /// /// This represents the total number of in-flight futures. pub fn len(&self) -> usize { - if self.len == TERMINATED_SENTINEL_LENGTH { 0 } else { self.len } + let len = self.len.get(); + if len == TERMINATED_SENTINEL_LENGTH { 0 } else { len } } /// Returns `true` if the set contains no futures. pub fn is_empty(&self) -> bool { - self.len == 0 || self.len == TERMINATED_SENTINEL_LENGTH + let len = self.len.get(); + len == 0 || len == TERMINATED_SENTINEL_LENGTH } /// Push a future into the set. @@ -167,7 +169,7 @@ impl FuturesUnordered { /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must /// ensure that [`FuturesUnordered::poll_next`](Stream::poll_next) is called /// in order to receive wake-up notifications for the given future. - pub fn push(&mut self, future: Fut) { + pub fn push(&self, future: Fut) { let task = Arc::new(Task { future: UnsafeCell::new(Some(future)), next_all: UnsafeCell::new(ptr::null_mut()), @@ -179,8 +181,8 @@ impl FuturesUnordered { // If we've previously marked ourselves as terminated we need to reset // len to 0 to track it correctly - if self.len == TERMINATED_SENTINEL_LENGTH { - self.len = 0; + if self.len.get() == TERMINATED_SENTINEL_LENGTH { + self.len.set(0); } // Right now our task has a strong reference count of 1. We transfer @@ -203,7 +205,7 @@ impl FuturesUnordered { /// Returns an iterator that allows inspecting each future in the set. fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { IterPinRef { - task: self.head_all, + task: self.head_all.get(), len: self.len(), _marker: PhantomData, } @@ -217,7 +219,7 @@ impl FuturesUnordered { /// Returns an iterator that allows modifying each future in the set. pub fn iter_pin_mut(self: Pin<&mut Self>) -> IterPinMut<'_, Fut> { IterPinMut { - task: self.head_all, + task: self.head_all.get(), len: self.len(), _marker: PhantomData } @@ -264,17 +266,18 @@ impl FuturesUnordered { } /// Insert a new task into the internal linked list. - fn link(&mut self, task: Arc>) -> *const Task { + fn link(&self, task: Arc>) -> *const Task { let ptr = Arc::into_raw(task); unsafe { - *(*ptr).next_all.get() = self.head_all; - if !self.head_all.is_null() { - *(*self.head_all).prev_all.get() = ptr; + *(*ptr).next_all.get() = self.head_all.get(); + if !self.head_all.get().is_null() { + *(*self.head_all.get()).prev_all.get() = ptr; } } - self.head_all = ptr; - self.len += 1; + self.head_all.set(ptr); + let old_len = self.len.get(); + self.len.set(old_len + 1); ptr } @@ -296,9 +299,10 @@ impl FuturesUnordered { if !prev.is_null() { *(*prev).next_all.get() = next; } else { - self.head_all = next; + self.head_all.set(next); } - self.len -= 1; + let old_len = self.len.get(); + self.len.set(old_len - 1); task } } @@ -320,7 +324,7 @@ impl Stream for FuturesUnordered { if self.is_empty() { // We can only consider ourselves terminated once we // have yielded a `None` - self.len = TERMINATED_SENTINEL_LENGTH; + self.len.set(TERMINATED_SENTINEL_LENGTH); return Poll::Ready(None); } else { return Poll::Pending; @@ -462,8 +466,8 @@ impl Drop for FuturesUnordered { // wakers flying around which contain `Task` references // inside them. We'll let those naturally get deallocated. unsafe { - while !self.head_all.is_null() { - let head = self.head_all; + while !self.head_all.get().is_null() { + let head = self.head_all.get(); let task = self.unlink(head); self.release_task(task); } @@ -490,12 +494,12 @@ impl FromIterator for FuturesUnordered { I: IntoIterator, { let acc = FuturesUnordered::new(); - iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc }) + iter.into_iter().fold(acc, |acc, item| { acc.push(item); acc }) } } impl FusedStream for FuturesUnordered { fn is_terminated(&self) -> bool { - self.len == TERMINATED_SENTINEL_LENGTH + self.len.get() == TERMINATED_SENTINEL_LENGTH } } diff --git a/futures/tests/ready_queue.rs b/futures/tests/ready_queue.rs index 8005800514..1fa7d79064 100644 --- a/futures/tests/ready_queue.rs +++ b/futures/tests/ready_queue.rs @@ -70,7 +70,7 @@ fn resolving_errors() { #[test] fn dropping_ready_queue() { block_on(future::lazy(move |_| { - let mut queue = FuturesUnordered::new(); + let queue = FuturesUnordered::new(); let (mut tx1, rx1) = oneshot::channel::<()>(); let (mut tx2, rx2) = oneshot::channel::<()>(); let (mut tx3, rx3) = oneshot::channel::<()>();