Skip to content

Commit

Permalink
fix: [Local]SpawnExt should take &self as their base traits
Browse files Browse the repository at this point in the history
Seems like an oversight in rust-lang#1950

Technically a

BREAKING CHANGE
  • Loading branch information
Marwes committed Nov 6, 2019
1 parent 0c7fa20 commit 9474f16
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 82 deletions.
44 changes: 21 additions & 23 deletions futures-executor/src/local_pool.rs
Expand Up @@ -2,12 +2,12 @@ use crate::enter;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_task::{FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError};
use futures_task::{waker_ref, ArcWake};
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
use futures_util::pin_mut;
use futures_util::stream::FuturesUnordered;
use futures_util::stream::StreamExt;
use futures_util::pin_mut;
use std::cell::{RefCell};
use std::cell::RefCell;
use std::ops::{Deref, DerefMut};
use std::rc::{Rc, Weak};
use std::sync::Arc;
Expand Down Expand Up @@ -40,7 +40,7 @@ pub struct LocalSpawner {
type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;

pub(crate) struct ThreadNotify {
thread: Thread
thread: Thread,
}

thread_local! {
Expand All @@ -58,9 +58,10 @@ impl ArcWake for ThreadNotify {
// Set up and run a basic single-threaded spawner loop, invoking `f` on each
// turn.
fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
let _enter = enter()
.expect("cannot execute `LocalPool` executor from within \
another executor");
let _enter = enter().expect(
"cannot execute `LocalPool` executor from within \
another executor",
);

CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref(thread_notify);
Expand All @@ -75,9 +76,10 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
}

fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
let _enter = enter()
.expect("cannot execute `LocalPool` executor from within \
another executor");
let _enter = enter().expect(
"cannot execute `LocalPool` executor from within \
another executor",
);

CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref(thread_notify);
Expand All @@ -98,7 +100,7 @@ impl LocalPool {
/// Get a clonable handle to the pool as a [`Spawn`].
pub fn spawner(&self) -> LocalSpawner {
LocalSpawner {
incoming: Rc::downgrade(&self.incoming)
incoming: Rc::downgrade(&self.incoming),
}
}

Expand Down Expand Up @@ -164,7 +166,7 @@ impl LocalPool {
/// use futures::future::{ready, pending};
///
/// let mut pool = LocalPool::new();
/// let mut spawner = pool.spawner();
/// let spawner = pool.spawner();
///
/// spawner.spawn_local(ready(())).unwrap();
/// spawner.spawn_local(ready(())).unwrap();
Expand Down Expand Up @@ -212,7 +214,7 @@ impl LocalPool {
/// use futures::future::{ready, pending};
///
/// let mut pool = LocalPool::new();
/// let mut spawner = pool.spawner();
/// let spawner = pool.spawner();
///
/// spawner.spawn_local(ready(())).unwrap();
/// spawner.spawn_local(ready(())).unwrap();
Expand All @@ -229,7 +231,7 @@ impl LocalPool {
/// of the pool's run or poll methods. While the function is running, all tasks
/// in the pool will try to make progress.
pub fn run_until_stalled(&mut self) {
poll_executor(|ctx| {
poll_executor(|ctx| {
let _ = self.poll_pool(ctx);
});
}
Expand Down Expand Up @@ -297,7 +299,9 @@ pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> {

/// An iterator which blocks on values from a stream until they become available.
#[derive(Debug)]
pub struct BlockingStream<S: Stream + Unpin> { stream: S }
pub struct BlockingStream<S: Stream + Unpin> {
stream: S,
}

impl<S: Stream + Unpin> Deref for BlockingStream<S> {
type Target = S;
Expand Down Expand Up @@ -332,10 +336,7 @@ impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
}

impl Spawn for LocalSpawner {
fn spawn_obj(
&self,
future: FutureObj<'static, ()>,
) -> Result<(), SpawnError> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(future.into());
Ok(())
Expand All @@ -354,10 +355,7 @@ impl Spawn for LocalSpawner {
}

impl LocalSpawn for LocalSpawner {
fn spawn_local_obj(
&self,
future: LocalFutureObj<'static, ()>,
) -> Result<(), SpawnError> {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(future);
Ok(())
Expand Down
20 changes: 20 additions & 0 deletions futures-task/src/spawn.rs
Expand Up @@ -83,6 +83,16 @@ impl SpawnError {
}
}

impl<Sp: ?Sized + Spawn> Spawn for &Sp {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
Sp::spawn_obj(self, future)
}

fn status(&self) -> Result<(), SpawnError> {
Sp::status(self)
}
}

impl<Sp: ?Sized + Spawn> Spawn for &mut Sp {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
Sp::spawn_obj(self, future)
Expand All @@ -103,6 +113,16 @@ impl<Sp: ?Sized + LocalSpawn> LocalSpawn for &Sp {
}
}

impl<Sp: ?Sized + LocalSpawn> LocalSpawn for &mut Sp {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
Sp::spawn_local_obj(self, future)
}

fn status_local(&self) -> Result<(), SpawnError> {
Sp::status_local(self)
}
}

#[cfg(feature = "alloc")]
mod if_alloc {
use super::*;
Expand Down
9 changes: 3 additions & 6 deletions futures-test/src/task/noop_spawner.rs
@@ -1,4 +1,4 @@
use futures_task::{Spawn, SpawnError, FutureObj};
use futures_task::{FutureObj, Spawn, SpawnError};

/// An implementation of [`Spawn`](futures_task::Spawn) that
/// discards spawned futures when used.
Expand All @@ -9,7 +9,7 @@ use futures_task::{Spawn, SpawnError, FutureObj};
/// use futures::task::SpawnExt;
/// use futures_test::task::NoopSpawner;
///
/// let mut spawner = NoopSpawner::new();
/// let spawner = NoopSpawner::new();
/// spawner.spawn(async { }).unwrap();
/// ```
#[derive(Debug)]
Expand All @@ -25,10 +25,7 @@ impl NoopSpawner {
}

impl Spawn for NoopSpawner {
fn spawn_obj(
&self,
_future: FutureObj<'static, ()>,
) -> Result<(), SpawnError> {
fn spawn_obj(&self, _future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
Ok(())
}
}
Expand Down
9 changes: 3 additions & 6 deletions futures-test/src/task/panic_spawner.rs
@@ -1,4 +1,4 @@
use futures_task::{Spawn, SpawnError, FutureObj};
use futures_task::{FutureObj, Spawn, SpawnError};

/// An implementation of [`Spawn`](futures_task::Spawn) that panics
/// when used.
Expand All @@ -9,7 +9,7 @@ use futures_task::{Spawn, SpawnError, FutureObj};
/// use futures::task::SpawnExt;
/// use futures_test::task::PanicSpawner;
///
/// let mut spawn = PanicSpawner::new();
/// let spawn = PanicSpawner::new();
/// spawn.spawn(async { })?; // Will panic
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
Expand All @@ -26,10 +26,7 @@ impl PanicSpawner {
}

impl Spawn for PanicSpawner {
fn spawn_obj(
&self,
_future: FutureObj<'static, ()>,
) -> Result<(), SpawnError> {
fn spawn_obj(&self, _future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
panic!("should not spawn")
}
}
Expand Down
9 changes: 3 additions & 6 deletions futures-test/src/task/record_spawner.rs
@@ -1,4 +1,4 @@
use futures_task::{Spawn, SpawnError, FutureObj};
use futures_task::{FutureObj, Spawn, SpawnError};
use std::cell::{Ref, RefCell};

/// An implementation of [`Spawn`](futures_task::Spawn) that records
Expand All @@ -10,7 +10,7 @@ use std::cell::{Ref, RefCell};
/// use futures::task::SpawnExt;
/// use futures_test::task::RecordSpawner;
///
/// let mut recorder = RecordSpawner::new();
/// let recorder = RecordSpawner::new();
/// recorder.spawn(async { }).unwrap();
/// assert_eq!(recorder.spawned().len(), 1);
/// ```
Expand All @@ -32,10 +32,7 @@ impl RecordSpawner {
}

impl Spawn for RecordSpawner {
fn spawn_obj(
&self,
future: FutureObj<'static, ()>,
) -> Result<(), SpawnError> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.spawned.borrow_mut().push(future);
Ok(())
}
Expand Down
35 changes: 15 additions & 20 deletions futures-util/src/compat/executor.rs
@@ -1,21 +1,18 @@

use super::{Compat, Future01CompatExt};
use crate::{
future::{FutureExt, TryFutureExt, UnitError},
task::SpawnExt,
};
use futures_01::future::{ExecuteError as ExecuteError01, Executor as Executor01};
use futures_01::Future as Future01;
use futures_01::future::{Executor as Executor01, ExecuteError as ExecuteError01};
use futures_task::{FutureObj, Spawn as Spawn03, SpawnError as SpawnError03};

/// A future that can run on a futures 0.1
/// [`Executor`](futures_01::future::Executor).
pub type Executor01Future = Compat<UnitError<FutureObj<'static, ()>>>;

/// Extension trait for futures 0.1 [`Executor`](futures_01::future::Executor).
pub trait Executor01CompatExt: Executor01<Executor01Future> +
Clone + Send + 'static
{
pub trait Executor01CompatExt: Executor01<Executor01Future> + Clone + Send + 'static {
/// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a
/// futures 0.3 [`Spawn`](futures_task::Spawn).
///
Expand All @@ -27,7 +24,7 @@ pub trait Executor01CompatExt: Executor01<Executor01Future> +
///
/// # let (tx, rx) = futures::channel::oneshot::channel();
///
/// let mut spawner = DefaultExecutor::current().compat();
/// let spawner = DefaultExecutor::current().compat();
/// let future03 = async move {
/// println!("Running on the pool");
/// spawner.spawn(async {
Expand All @@ -42,39 +39,36 @@ pub trait Executor01CompatExt: Executor01<Executor01Future> +
/// # futures::executor::block_on(rx).unwrap();
/// ```
fn compat(self) -> Executor01As03<Self>
where Self: Sized;
where
Self: Sized;
}

impl<Ex> Executor01CompatExt for Ex
where Ex: Executor01<Executor01Future> + Clone + Send + 'static
where
Ex: Executor01<Executor01Future> + Clone + Send + 'static,
{
fn compat(self) -> Executor01As03<Self> {
Executor01As03 {
executor01: self,
}
Executor01As03 { executor01: self }
}
}

/// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a
/// futures 0.3 [`Spawn`](futures_task::Spawn).
#[derive(Debug, Clone)]
pub struct Executor01As03<Ex> {
executor01: Ex
executor01: Ex,
}

impl<Ex> Spawn03 for Executor01As03<Ex>
where
Ex: Executor01<Executor01Future> + Clone + Send + 'static,
{
fn spawn_obj(
&self,
future: FutureObj<'static, ()>,
) -> Result<(), SpawnError03> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError03> {
let future = future.unit_error().compat();

self.executor01.execute(future).map_err(|_|
SpawnError03::shutdown()
)
self.executor01
.execute(future)
.map_err(|_| SpawnError03::shutdown())
}
}

Expand All @@ -85,7 +79,8 @@ where
Fut: Future01<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, future: Fut) -> Result<(), ExecuteError01<Fut>> {
(&self.inner).spawn(future.compat().map(|_| ()))
(&self.inner)
.spawn(future.compat().map(|_| ()))
.expect("unable to spawn future from Compat executor");
Ok(())
}
Expand Down

0 comments on commit 9474f16

Please sign in to comment.