Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: [Local]SpawnExt should take &self as their base traits #1959

Merged
merged 1 commit into from Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 21 additions & 23 deletions futures-executor/src/local_pool.rs
Expand Up @@ -2,12 +2,12 @@ use crate::enter;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_task::{FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError};
use futures_task::{waker_ref, ArcWake};
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
use futures_util::pin_mut;
use futures_util::stream::FuturesUnordered;
use futures_util::stream::StreamExt;
use futures_util::pin_mut;
use std::cell::{RefCell};
use std::cell::RefCell;
use std::ops::{Deref, DerefMut};
use std::rc::{Rc, Weak};
use std::sync::Arc;
Expand Down Expand Up @@ -40,7 +40,7 @@ pub struct LocalSpawner {
type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;

pub(crate) struct ThreadNotify {
thread: Thread
thread: Thread,
}

thread_local! {
Expand All @@ -58,9 +58,10 @@ impl ArcWake for ThreadNotify {
// Set up and run a basic single-threaded spawner loop, invoking `f` on each
// turn.
fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
let _enter = enter()
.expect("cannot execute `LocalPool` executor from within \
another executor");
let _enter = enter().expect(
"cannot execute `LocalPool` executor from within \
another executor",
);

CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref(thread_notify);
Expand All @@ -75,9 +76,10 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
}

fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
let _enter = enter()
.expect("cannot execute `LocalPool` executor from within \
another executor");
let _enter = enter().expect(
"cannot execute `LocalPool` executor from within \
another executor",
);

CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref(thread_notify);
Expand All @@ -98,7 +100,7 @@ impl LocalPool {
/// Get a clonable handle to the pool as a [`Spawn`].
pub fn spawner(&self) -> LocalSpawner {
LocalSpawner {
incoming: Rc::downgrade(&self.incoming)
incoming: Rc::downgrade(&self.incoming),
}
}

Expand Down Expand Up @@ -164,7 +166,7 @@ impl LocalPool {
/// use futures::future::{ready, pending};
///
/// let mut pool = LocalPool::new();
/// let mut spawner = pool.spawner();
/// let spawner = pool.spawner();
///
/// spawner.spawn_local(ready(())).unwrap();
/// spawner.spawn_local(ready(())).unwrap();
Expand Down Expand Up @@ -212,7 +214,7 @@ impl LocalPool {
/// use futures::future::{ready, pending};
///
/// let mut pool = LocalPool::new();
/// let mut spawner = pool.spawner();
/// let spawner = pool.spawner();
///
/// spawner.spawn_local(ready(())).unwrap();
/// spawner.spawn_local(ready(())).unwrap();
Expand All @@ -229,7 +231,7 @@ impl LocalPool {
/// of the pool's run or poll methods. While the function is running, all tasks
/// in the pool will try to make progress.
pub fn run_until_stalled(&mut self) {
poll_executor(|ctx| {
poll_executor(|ctx| {
let _ = self.poll_pool(ctx);
});
}
Expand Down Expand Up @@ -297,7 +299,9 @@ pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> {

/// An iterator which blocks on values from a stream until they become available.
#[derive(Debug)]
pub struct BlockingStream<S: Stream + Unpin> { stream: S }
pub struct BlockingStream<S: Stream + Unpin> {
stream: S,
}

impl<S: Stream + Unpin> Deref for BlockingStream<S> {
type Target = S;
Expand Down Expand Up @@ -332,10 +336,7 @@ impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
}

impl Spawn for LocalSpawner {
fn spawn_obj(
&self,
future: FutureObj<'static, ()>,
) -> Result<(), SpawnError> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(future.into());
Ok(())
Expand All @@ -354,10 +355,7 @@ impl Spawn for LocalSpawner {
}

impl LocalSpawn for LocalSpawner {
fn spawn_local_obj(
&self,
future: LocalFutureObj<'static, ()>,
) -> Result<(), SpawnError> {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(future);
Ok(())
Expand Down
20 changes: 20 additions & 0 deletions futures-task/src/spawn.rs
Expand Up @@ -83,6 +83,16 @@ impl SpawnError {
}
}

impl<Sp: ?Sized + Spawn> Spawn for &Sp {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
Sp::spawn_obj(self, future)
}

fn status(&self) -> Result<(), SpawnError> {
Sp::status(self)
}
}

impl<Sp: ?Sized + Spawn> Spawn for &mut Sp {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
Sp::spawn_obj(self, future)
Expand All @@ -103,6 +113,16 @@ impl<Sp: ?Sized + LocalSpawn> LocalSpawn for &Sp {
}
}

impl<Sp: ?Sized + LocalSpawn> LocalSpawn for &mut Sp {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
Sp::spawn_local_obj(self, future)
}

fn status_local(&self) -> Result<(), SpawnError> {
Sp::status_local(self)
}
}

#[cfg(feature = "alloc")]
mod if_alloc {
use super::*;
Expand Down
9 changes: 3 additions & 6 deletions futures-test/src/task/noop_spawner.rs
@@ -1,4 +1,4 @@
use futures_task::{Spawn, SpawnError, FutureObj};
use futures_task::{FutureObj, Spawn, SpawnError};

/// An implementation of [`Spawn`](futures_task::Spawn) that
/// discards spawned futures when used.
Expand All @@ -9,7 +9,7 @@ use futures_task::{Spawn, SpawnError, FutureObj};
/// use futures::task::SpawnExt;
/// use futures_test::task::NoopSpawner;
///
/// let mut spawner = NoopSpawner::new();
/// let spawner = NoopSpawner::new();
/// spawner.spawn(async { }).unwrap();
/// ```
#[derive(Debug)]
Expand All @@ -25,10 +25,7 @@ impl NoopSpawner {
}

impl Spawn for NoopSpawner {
fn spawn_obj(
&self,
_future: FutureObj<'static, ()>,
) -> Result<(), SpawnError> {
fn spawn_obj(&self, _future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
Ok(())
}
}
Expand Down
9 changes: 3 additions & 6 deletions futures-test/src/task/panic_spawner.rs
@@ -1,4 +1,4 @@
use futures_task::{Spawn, SpawnError, FutureObj};
use futures_task::{FutureObj, Spawn, SpawnError};

/// An implementation of [`Spawn`](futures_task::Spawn) that panics
/// when used.
Expand All @@ -9,7 +9,7 @@ use futures_task::{Spawn, SpawnError, FutureObj};
/// use futures::task::SpawnExt;
/// use futures_test::task::PanicSpawner;
///
/// let mut spawn = PanicSpawner::new();
/// let spawn = PanicSpawner::new();
/// spawn.spawn(async { })?; // Will panic
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
Expand All @@ -26,10 +26,7 @@ impl PanicSpawner {
}

impl Spawn for PanicSpawner {
fn spawn_obj(
&self,
_future: FutureObj<'static, ()>,
) -> Result<(), SpawnError> {
fn spawn_obj(&self, _future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
panic!("should not spawn")
}
}
Expand Down
9 changes: 3 additions & 6 deletions futures-test/src/task/record_spawner.rs
@@ -1,4 +1,4 @@
use futures_task::{Spawn, SpawnError, FutureObj};
use futures_task::{FutureObj, Spawn, SpawnError};
use std::cell::{Ref, RefCell};

/// An implementation of [`Spawn`](futures_task::Spawn) that records
Expand All @@ -10,7 +10,7 @@ use std::cell::{Ref, RefCell};
/// use futures::task::SpawnExt;
/// use futures_test::task::RecordSpawner;
///
/// let mut recorder = RecordSpawner::new();
/// let recorder = RecordSpawner::new();
/// recorder.spawn(async { }).unwrap();
/// assert_eq!(recorder.spawned().len(), 1);
/// ```
Expand All @@ -32,10 +32,7 @@ impl RecordSpawner {
}

impl Spawn for RecordSpawner {
fn spawn_obj(
&self,
future: FutureObj<'static, ()>,
) -> Result<(), SpawnError> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.spawned.borrow_mut().push(future);
Ok(())
}
Expand Down
35 changes: 15 additions & 20 deletions futures-util/src/compat/executor.rs
@@ -1,21 +1,18 @@

use super::{Compat, Future01CompatExt};
use crate::{
future::{FutureExt, TryFutureExt, UnitError},
task::SpawnExt,
};
use futures_01::future::{ExecuteError as ExecuteError01, Executor as Executor01};
use futures_01::Future as Future01;
use futures_01::future::{Executor as Executor01, ExecuteError as ExecuteError01};
use futures_task::{FutureObj, Spawn as Spawn03, SpawnError as SpawnError03};

/// A future that can run on a futures 0.1
/// [`Executor`](futures_01::future::Executor).
pub type Executor01Future = Compat<UnitError<FutureObj<'static, ()>>>;

/// Extension trait for futures 0.1 [`Executor`](futures_01::future::Executor).
pub trait Executor01CompatExt: Executor01<Executor01Future> +
Clone + Send + 'static
{
pub trait Executor01CompatExt: Executor01<Executor01Future> + Clone + Send + 'static {
/// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a
/// futures 0.3 [`Spawn`](futures_task::Spawn).
///
Expand All @@ -27,7 +24,7 @@ pub trait Executor01CompatExt: Executor01<Executor01Future> +
///
/// # let (tx, rx) = futures::channel::oneshot::channel();
///
/// let mut spawner = DefaultExecutor::current().compat();
/// let spawner = DefaultExecutor::current().compat();
/// let future03 = async move {
/// println!("Running on the pool");
/// spawner.spawn(async {
Expand All @@ -42,39 +39,36 @@ pub trait Executor01CompatExt: Executor01<Executor01Future> +
/// # futures::executor::block_on(rx).unwrap();
/// ```
fn compat(self) -> Executor01As03<Self>
where Self: Sized;
where
Self: Sized;
}

impl<Ex> Executor01CompatExt for Ex
where Ex: Executor01<Executor01Future> + Clone + Send + 'static
where
Ex: Executor01<Executor01Future> + Clone + Send + 'static,
{
fn compat(self) -> Executor01As03<Self> {
Executor01As03 {
executor01: self,
}
Executor01As03 { executor01: self }
}
}

/// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a
/// futures 0.3 [`Spawn`](futures_task::Spawn).
#[derive(Debug, Clone)]
pub struct Executor01As03<Ex> {
executor01: Ex
executor01: Ex,
}

impl<Ex> Spawn03 for Executor01As03<Ex>
where
Ex: Executor01<Executor01Future> + Clone + Send + 'static,
{
fn spawn_obj(
&self,
future: FutureObj<'static, ()>,
) -> Result<(), SpawnError03> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError03> {
let future = future.unit_error().compat();

self.executor01.execute(future).map_err(|_|
SpawnError03::shutdown()
)
self.executor01
.execute(future)
.map_err(|_| SpawnError03::shutdown())
}
}

Expand All @@ -85,7 +79,8 @@ where
Fut: Future01<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, future: Fut) -> Result<(), ExecuteError01<Fut>> {
(&self.inner).spawn(future.compat().map(|_| ()))
(&self.inner)
.spawn(future.compat().map(|_| ()))
.expect("unable to spawn future from Compat executor");
Ok(())
}
Expand Down