From 9474f162c2cd4ab960fc7a51928af3505958fab9 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Wed, 6 Nov 2019 22:51:02 +0100 Subject: [PATCH] fix: [Local]SpawnExt should take &self as their base traits Seems like an oversight in https://github.com/rust-lang-nursery/futures-rs/pull/1950 Technically a BREAKING CHANGE --- futures-executor/src/local_pool.rs | 44 ++++++++++++------------- futures-task/src/spawn.rs | 20 +++++++++++ futures-test/src/task/noop_spawner.rs | 9 ++--- futures-test/src/task/panic_spawner.rs | 9 ++--- futures-test/src/task/record_spawner.rs | 9 ++--- futures-util/src/compat/executor.rs | 35 +++++++++----------- futures-util/src/task/spawn.rs | 33 +++++++++---------- futures/tests/eventual.rs | 2 +- futures/tests/mutex.rs | 7 ++-- 9 files changed, 86 insertions(+), 82 deletions(-) diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index fea2f143e6..7c3a1477e1 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -2,12 +2,12 @@ use crate::enter; use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use futures_task::{FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError}; use futures_task::{waker_ref, ArcWake}; +use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; +use futures_util::pin_mut; use futures_util::stream::FuturesUnordered; use futures_util::stream::StreamExt; -use futures_util::pin_mut; -use std::cell::{RefCell}; +use std::cell::RefCell; use std::ops::{Deref, DerefMut}; use std::rc::{Rc, Weak}; use std::sync::Arc; @@ -40,7 +40,7 @@ pub struct LocalSpawner { type Incoming = RefCell>>; pub(crate) struct ThreadNotify { - thread: Thread + thread: Thread, } thread_local! { @@ -58,9 +58,10 @@ impl ArcWake for ThreadNotify { // Set up and run a basic single-threaded spawner loop, invoking `f` on each // turn. fn run_executor) -> Poll>(mut f: F) -> T { - let _enter = enter() - .expect("cannot execute `LocalPool` executor from within \ - another executor"); + let _enter = enter().expect( + "cannot execute `LocalPool` executor from within \ + another executor", + ); CURRENT_THREAD_NOTIFY.with(|thread_notify| { let waker = waker_ref(thread_notify); @@ -75,9 +76,10 @@ fn run_executor) -> Poll>(mut f: F) -> T { } fn poll_executor) -> T>(mut f: F) -> T { - let _enter = enter() - .expect("cannot execute `LocalPool` executor from within \ - another executor"); + let _enter = enter().expect( + "cannot execute `LocalPool` executor from within \ + another executor", + ); CURRENT_THREAD_NOTIFY.with(|thread_notify| { let waker = waker_ref(thread_notify); @@ -98,7 +100,7 @@ impl LocalPool { /// Get a clonable handle to the pool as a [`Spawn`]. pub fn spawner(&self) -> LocalSpawner { LocalSpawner { - incoming: Rc::downgrade(&self.incoming) + incoming: Rc::downgrade(&self.incoming), } } @@ -164,7 +166,7 @@ impl LocalPool { /// use futures::future::{ready, pending}; /// /// let mut pool = LocalPool::new(); - /// let mut spawner = pool.spawner(); + /// let spawner = pool.spawner(); /// /// spawner.spawn_local(ready(())).unwrap(); /// spawner.spawn_local(ready(())).unwrap(); @@ -212,7 +214,7 @@ impl LocalPool { /// use futures::future::{ready, pending}; /// /// let mut pool = LocalPool::new(); - /// let mut spawner = pool.spawner(); + /// let spawner = pool.spawner(); /// /// spawner.spawn_local(ready(())).unwrap(); /// spawner.spawn_local(ready(())).unwrap(); @@ -229,7 +231,7 @@ impl LocalPool { /// of the pool's run or poll methods. While the function is running, all tasks /// in the pool will try to make progress. pub fn run_until_stalled(&mut self) { - poll_executor(|ctx| { + poll_executor(|ctx| { let _ = self.poll_pool(ctx); }); } @@ -297,7 +299,9 @@ pub fn block_on_stream(stream: S) -> BlockingStream { /// An iterator which blocks on values from a stream until they become available. #[derive(Debug)] -pub struct BlockingStream { stream: S } +pub struct BlockingStream { + stream: S, +} impl Deref for BlockingStream { type Target = S; @@ -332,10 +336,7 @@ impl Iterator for BlockingStream { } impl Spawn for LocalSpawner { - fn spawn_obj( - &self, - future: FutureObj<'static, ()>, - ) -> Result<(), SpawnError> { + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { if let Some(incoming) = self.incoming.upgrade() { incoming.borrow_mut().push(future.into()); Ok(()) @@ -354,10 +355,7 @@ impl Spawn for LocalSpawner { } impl LocalSpawn for LocalSpawner { - fn spawn_local_obj( - &self, - future: LocalFutureObj<'static, ()>, - ) -> Result<(), SpawnError> { + fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { if let Some(incoming) = self.incoming.upgrade() { incoming.borrow_mut().push(future); Ok(()) diff --git a/futures-task/src/spawn.rs b/futures-task/src/spawn.rs index d59cc454a9..815411da8b 100644 --- a/futures-task/src/spawn.rs +++ b/futures-task/src/spawn.rs @@ -83,6 +83,16 @@ impl SpawnError { } } +impl Spawn for &Sp { + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { + Sp::spawn_obj(self, future) + } + + fn status(&self) -> Result<(), SpawnError> { + Sp::status(self) + } +} + impl Spawn for &mut Sp { fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { Sp::spawn_obj(self, future) @@ -103,6 +113,16 @@ impl LocalSpawn for &Sp { } } +impl LocalSpawn for &mut Sp { + fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { + Sp::spawn_local_obj(self, future) + } + + fn status_local(&self) -> Result<(), SpawnError> { + Sp::status_local(self) + } +} + #[cfg(feature = "alloc")] mod if_alloc { use super::*; diff --git a/futures-test/src/task/noop_spawner.rs b/futures-test/src/task/noop_spawner.rs index 039a27ecec..8967f91ec4 100644 --- a/futures-test/src/task/noop_spawner.rs +++ b/futures-test/src/task/noop_spawner.rs @@ -1,4 +1,4 @@ -use futures_task::{Spawn, SpawnError, FutureObj}; +use futures_task::{FutureObj, Spawn, SpawnError}; /// An implementation of [`Spawn`](futures_task::Spawn) that /// discards spawned futures when used. @@ -9,7 +9,7 @@ use futures_task::{Spawn, SpawnError, FutureObj}; /// use futures::task::SpawnExt; /// use futures_test::task::NoopSpawner; /// -/// let mut spawner = NoopSpawner::new(); +/// let spawner = NoopSpawner::new(); /// spawner.spawn(async { }).unwrap(); /// ``` #[derive(Debug)] @@ -25,10 +25,7 @@ impl NoopSpawner { } impl Spawn for NoopSpawner { - fn spawn_obj( - &self, - _future: FutureObj<'static, ()>, - ) -> Result<(), SpawnError> { + fn spawn_obj(&self, _future: FutureObj<'static, ()>) -> Result<(), SpawnError> { Ok(()) } } diff --git a/futures-test/src/task/panic_spawner.rs b/futures-test/src/task/panic_spawner.rs index 19978c7463..e29463df6d 100644 --- a/futures-test/src/task/panic_spawner.rs +++ b/futures-test/src/task/panic_spawner.rs @@ -1,4 +1,4 @@ -use futures_task::{Spawn, SpawnError, FutureObj}; +use futures_task::{FutureObj, Spawn, SpawnError}; /// An implementation of [`Spawn`](futures_task::Spawn) that panics /// when used. @@ -9,7 +9,7 @@ use futures_task::{Spawn, SpawnError, FutureObj}; /// use futures::task::SpawnExt; /// use futures_test::task::PanicSpawner; /// -/// let mut spawn = PanicSpawner::new(); +/// let spawn = PanicSpawner::new(); /// spawn.spawn(async { })?; // Will panic /// # Ok::<(), Box>(()) /// ``` @@ -26,10 +26,7 @@ impl PanicSpawner { } impl Spawn for PanicSpawner { - fn spawn_obj( - &self, - _future: FutureObj<'static, ()>, - ) -> Result<(), SpawnError> { + fn spawn_obj(&self, _future: FutureObj<'static, ()>) -> Result<(), SpawnError> { panic!("should not spawn") } } diff --git a/futures-test/src/task/record_spawner.rs b/futures-test/src/task/record_spawner.rs index ccc2dbc406..59539fa217 100644 --- a/futures-test/src/task/record_spawner.rs +++ b/futures-test/src/task/record_spawner.rs @@ -1,4 +1,4 @@ -use futures_task::{Spawn, SpawnError, FutureObj}; +use futures_task::{FutureObj, Spawn, SpawnError}; use std::cell::{Ref, RefCell}; /// An implementation of [`Spawn`](futures_task::Spawn) that records @@ -10,7 +10,7 @@ use std::cell::{Ref, RefCell}; /// use futures::task::SpawnExt; /// use futures_test::task::RecordSpawner; /// -/// let mut recorder = RecordSpawner::new(); +/// let recorder = RecordSpawner::new(); /// recorder.spawn(async { }).unwrap(); /// assert_eq!(recorder.spawned().len(), 1); /// ``` @@ -32,10 +32,7 @@ impl RecordSpawner { } impl Spawn for RecordSpawner { - fn spawn_obj( - &self, - future: FutureObj<'static, ()>, - ) -> Result<(), SpawnError> { + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { self.spawned.borrow_mut().push(future); Ok(()) } diff --git a/futures-util/src/compat/executor.rs b/futures-util/src/compat/executor.rs index 230bff51ee..82cb496a70 100644 --- a/futures-util/src/compat/executor.rs +++ b/futures-util/src/compat/executor.rs @@ -1,11 +1,10 @@ - use super::{Compat, Future01CompatExt}; use crate::{ future::{FutureExt, TryFutureExt, UnitError}, task::SpawnExt, }; +use futures_01::future::{ExecuteError as ExecuteError01, Executor as Executor01}; use futures_01::Future as Future01; -use futures_01::future::{Executor as Executor01, ExecuteError as ExecuteError01}; use futures_task::{FutureObj, Spawn as Spawn03, SpawnError as SpawnError03}; /// A future that can run on a futures 0.1 @@ -13,9 +12,7 @@ use futures_task::{FutureObj, Spawn as Spawn03, SpawnError as SpawnError03}; pub type Executor01Future = Compat>>; /// Extension trait for futures 0.1 [`Executor`](futures_01::future::Executor). -pub trait Executor01CompatExt: Executor01 + - Clone + Send + 'static -{ +pub trait Executor01CompatExt: Executor01 + Clone + Send + 'static { /// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a /// futures 0.3 [`Spawn`](futures_task::Spawn). /// @@ -27,7 +24,7 @@ pub trait Executor01CompatExt: Executor01 + /// /// # let (tx, rx) = futures::channel::oneshot::channel(); /// - /// let mut spawner = DefaultExecutor::current().compat(); + /// let spawner = DefaultExecutor::current().compat(); /// let future03 = async move { /// println!("Running on the pool"); /// spawner.spawn(async { @@ -42,16 +39,16 @@ pub trait Executor01CompatExt: Executor01 + /// # futures::executor::block_on(rx).unwrap(); /// ``` fn compat(self) -> Executor01As03 - where Self: Sized; + where + Self: Sized; } impl Executor01CompatExt for Ex -where Ex: Executor01 + Clone + Send + 'static +where + Ex: Executor01 + Clone + Send + 'static, { fn compat(self) -> Executor01As03 { - Executor01As03 { - executor01: self, - } + Executor01As03 { executor01: self } } } @@ -59,22 +56,19 @@ where Ex: Executor01 + Clone + Send + 'static /// futures 0.3 [`Spawn`](futures_task::Spawn). #[derive(Debug, Clone)] pub struct Executor01As03 { - executor01: Ex + executor01: Ex, } impl Spawn03 for Executor01As03 where Ex: Executor01 + Clone + Send + 'static, { - fn spawn_obj( - &self, - future: FutureObj<'static, ()>, - ) -> Result<(), SpawnError03> { + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError03> { let future = future.unit_error().compat(); - self.executor01.execute(future).map_err(|_| - SpawnError03::shutdown() - ) + self.executor01 + .execute(future) + .map_err(|_| SpawnError03::shutdown()) } } @@ -85,7 +79,8 @@ where Fut: Future01 + Send + 'static, { fn execute(&self, future: Fut) -> Result<(), ExecuteError01> { - (&self.inner).spawn(future.compat().map(|_| ())) + (&self.inner) + .spawn(future.compat().map(|_| ())) .expect("unable to spawn future from Compat executor"); Ok(()) } diff --git a/futures-util/src/task/spawn.rs b/futures-util/src/task/spawn.rs index ba2b4be026..f34445a030 100644 --- a/futures-util/src/task/spawn.rs +++ b/futures-util/src/task/spawn.rs @@ -1,16 +1,17 @@ use futures_task::{LocalSpawn, Spawn}; -#[cfg(feature = "compat")] use crate::compat::Compat; +#[cfg(feature = "compat")] +use crate::compat::Compat; #[cfg(feature = "channel")] #[cfg(feature = "std")] use crate::future::{FutureExt, RemoteHandle}; #[cfg(feature = "alloc")] -use futures_core::future::Future; +use alloc::boxed::Box; #[cfg(feature = "alloc")] -use futures_task::{SpawnError, FutureObj, LocalFutureObj}; +use futures_core::future::Future; #[cfg(feature = "alloc")] -use alloc::boxed::Box; +use futures_task::{FutureObj, LocalFutureObj, SpawnError}; impl SpawnExt for Sp where Sp: Spawn {} impl LocalSpawnExt for Sp where Sp: LocalSpawn {} @@ -36,13 +37,13 @@ pub trait SpawnExt: Spawn { /// use futures::executor::ThreadPool; /// use futures::task::SpawnExt; /// - /// let mut executor = ThreadPool::new().unwrap(); + /// let executor = ThreadPool::new().unwrap(); /// /// let future = async { /* ... */ }; /// executor.spawn(future).unwrap(); /// ``` #[cfg(feature = "alloc")] - fn spawn(&mut self, future: Fut) -> Result<(), SpawnError> + fn spawn(&self, future: Fut) -> Result<(), SpawnError> where Fut: Future + Send + 'static, { @@ -61,7 +62,7 @@ pub trait SpawnExt: Spawn { /// use futures::future; /// use futures::task::SpawnExt; /// - /// let mut executor = ThreadPool::new().unwrap(); + /// let executor = ThreadPool::new().unwrap(); /// /// let future = future::ready(1); /// let join_handle_fut = executor.spawn_with_handle(future).unwrap(); @@ -69,10 +70,7 @@ pub trait SpawnExt: Spawn { /// ``` #[cfg(feature = "channel")] #[cfg(feature = "std")] - fn spawn_with_handle( - &mut self, - future: Fut - ) -> Result, SpawnError> + fn spawn_with_handle(&self, future: Fut) -> Result, SpawnError> where Fut: Future + Send + 'static, Fut::Output: Send, @@ -86,7 +84,8 @@ pub trait SpawnExt: Spawn { /// Requires the `compat` feature to enable. #[cfg(feature = "compat")] fn compat(self) -> Compat - where Self: Sized, + where + Self: Sized, { Compat::new(self) } @@ -114,13 +113,13 @@ pub trait LocalSpawnExt: LocalSpawn { /// use futures::task::LocalSpawnExt; /// /// let executor = LocalPool::new(); - /// let mut spawner = executor.spawner(); + /// let spawner = executor.spawner(); /// /// let future = async { /* ... */ }; /// spawner.spawn_local(future).unwrap(); /// ``` #[cfg(feature = "alloc")] - fn spawn_local(&mut self, future: Fut) -> Result<(), SpawnError> + fn spawn_local(&self, future: Fut) -> Result<(), SpawnError> where Fut: Future + 'static, { @@ -139,7 +138,7 @@ pub trait LocalSpawnExt: LocalSpawn { /// use futures::task::LocalSpawnExt; /// /// let mut executor = LocalPool::new(); - /// let mut spawner = executor.spawner(); + /// let spawner = executor.spawner(); /// /// let future = async { 1 }; /// let join_handle_fut = spawner.spawn_local_with_handle(future).unwrap(); @@ -148,8 +147,8 @@ pub trait LocalSpawnExt: LocalSpawn { #[cfg(feature = "channel")] #[cfg(feature = "std")] fn spawn_local_with_handle( - &mut self, - future: Fut + &self, + future: Fut, ) -> Result, SpawnError> where Fut: Future + 'static, diff --git a/futures/tests/eventual.rs b/futures/tests/eventual.rs index 12b26aafa5..bff000dd09 100644 --- a/futures/tests/eventual.rs +++ b/futures/tests/eventual.rs @@ -6,7 +6,7 @@ use std::sync::mpsc; use std::thread; fn run(future: F) { - let mut tp = ThreadPool::new().unwrap(); + let tp = ThreadPool::new().unwrap(); tp.spawn(future.map(drop)).unwrap(); } diff --git a/futures/tests/mutex.rs b/futures/tests/mutex.rs index 2e49a87933..80a6e65d66 100644 --- a/futures/tests/mutex.rs +++ b/futures/tests/mutex.rs @@ -5,7 +5,7 @@ use futures::lock::Mutex; use futures::stream::StreamExt; use futures::task::{Context, SpawnExt}; use futures_test::future::FutureTestExt; -use futures_test::task::{panic_context, new_count_waker}; +use futures_test::task::{new_count_waker, panic_context}; use std::sync::Arc; #[test] @@ -37,7 +37,7 @@ fn mutex_wakes_waiters() { #[test] fn mutex_contested() { let (tx, mut rx) = mpsc::unbounded(); - let mut pool = futures::executor::ThreadPool::builder() + let pool = futures::executor::ThreadPool::builder() .pool_size(16) .create() .unwrap(); @@ -55,7 +55,8 @@ fn mutex_contested() { *lock += 1; tx.unbounded_send(()).unwrap(); drop(lock); - }).unwrap(); + }) + .unwrap(); } block_on(async {