Skip to content

Commit

Permalink
Add BoundLocalPool and BoundLocalSpawner
Browse files Browse the repository at this point in the history
Adds derivatives of LocalPool and BoundLocalSpawner with a generic
lifetime parameter that replaces the previously used 'static, and
aliases both types with a 'static parameter to the originals.

Add a BoundLocalSpawn trait in a similar fashion. At the same time
simplify the implementation of Spawn, LocalSpawn and BoundLocalSpawn to
just work on any type implementing Deref to a type implementing a spawn
trait.
  • Loading branch information
haroldbruintjes committed Mar 28, 2024
1 parent 48b58c0 commit 81bcdac
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 112 deletions.
5 changes: 4 additions & 1 deletion futures-executor/src/lib.rs
Expand Up @@ -54,7 +54,10 @@ extern crate std;
#[cfg(feature = "std")]
mod local_pool;
#[cfg(feature = "std")]
pub use crate::local_pool::{block_on, block_on_stream, BlockingStream, LocalPool, LocalSpawner};
pub use crate::local_pool::{
block_on, block_on_stream, BlockingStream, BoundLocalPool, BoundLocalSpawner, LocalPool,
LocalSpawner,
};

#[cfg(feature = "thread-pool")]
#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
Expand Down
69 changes: 52 additions & 17 deletions futures-executor/src/local_pool.rs
Expand Up @@ -2,7 +2,7 @@ use crate::enter;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_task::{waker_ref, ArcWake};
use futures_task::{waker_ref, ArcWake, BoundLocalSpawn};
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
use futures_util::pin_mut;
use futures_util::stream::FuturesUnordered;
Expand All @@ -17,6 +17,36 @@ use std::sync::{
use std::thread::{self, Thread};
use std::vec::Vec;

/// A single-threaded task pool with bound lifetime for polling futures to
/// completion.
///
/// This executor allows you to multiplex any number of tasks onto a single
/// thread. It's appropriate to poll strictly I/O-bound futures that do very
/// little work in between I/O actions. The lifetime of the executor is bound by
/// a generic parameter. Futures associated with the executor need only outlive
/// this lifetime. That uncompleted futures are dropped when the lifetime of the
/// executor expires.
///
/// To get a handle to the pool that implements [`Spawn`](futures_task::Spawn),
/// use the [`spawner()`](BoundLocalPool::spawner) method. Because the executor
/// is single-threaded, it supports a special form of task spawning for
/// non-`Send` futures, via
/// [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
/// Additionally, tasks with a limited lifetime can be spawned via
/// [`spawn_bound_local_obj`](futures_task::BoundLocalSpawn::spawn_bound_local_obj).
#[derive(Debug)]
pub struct BoundLocalPool<'a> {
pool: FuturesUnordered<LocalFutureObj<'a, ()>>,
incoming: Rc<Incoming<'a>>,
}

/// A handle to a [`BoundLocalPool`] that implements
/// [`BoundLocalSpawn`](futures_task::BoundLocalSpawn).
#[derive(Clone, Debug)]
pub struct BoundLocalSpawner<'a> {
incoming: Weak<Incoming<'a>>,
}

/// A single-threaded task pool for polling futures to completion.
///
/// This executor allows you to multiplex any number of tasks onto a single
Expand All @@ -28,19 +58,13 @@ use std::vec::Vec;
/// [`spawner()`](LocalPool::spawner) method. Because the executor is
/// single-threaded, it supports a special form of task spawning for non-`Send`
/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
#[derive(Debug)]
pub struct LocalPool {
pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
incoming: Rc<Incoming>,
}
pub type LocalPool = BoundLocalPool<'static>;

/// A handle to a [`LocalPool`] that implements [`Spawn`](futures_task::Spawn).
#[derive(Clone, Debug)]
pub struct LocalSpawner {
incoming: Weak<Incoming>,
}
/// A handle to a [`LocalPool`] that implements [`Spawn`](futures_task::Spawn)
/// and [`LocalSpawn`](futures_task::LocalSpawn).
pub type LocalSpawner = BoundLocalSpawner<'static>;

type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
type Incoming<'a> = RefCell<Vec<LocalFutureObj<'a, ()>>>;

pub(crate) struct ThreadNotify {
/// The (single) executor thread.
Expand Down Expand Up @@ -107,15 +131,15 @@ fn woken() -> bool {
CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire))
}

impl LocalPool {
impl<'a> BoundLocalPool<'a> {
/// Create a new, empty pool of tasks.
pub fn new() -> Self {
Self { pool: FuturesUnordered::new(), incoming: Default::default() }
}

/// Get a clonable handle to the pool as a [`Spawn`].
pub fn spawner(&self) -> LocalSpawner {
LocalSpawner { incoming: Rc::downgrade(&self.incoming) }
pub fn spawner(&self) -> BoundLocalSpawner<'a> {
BoundLocalSpawner { incoming: Rc::downgrade(&self.incoming) }
}

/// Run all tasks in the pool to completion.
Expand Down Expand Up @@ -362,7 +386,7 @@ impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
}
}

impl Spawn for LocalSpawner {
impl Spawn for BoundLocalSpawner<'_> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(future.into());
Expand All @@ -381,7 +405,7 @@ impl Spawn for LocalSpawner {
}
}

impl LocalSpawn for LocalSpawner {
impl LocalSpawn for BoundLocalSpawner<'_> {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(future);
Expand All @@ -399,3 +423,14 @@ impl LocalSpawn for LocalSpawner {
}
}
}

impl<'a> BoundLocalSpawn<'a> for BoundLocalSpawner<'a> {
fn spawn_bound_local_obj(&self, future: LocalFutureObj<'a, ()>) -> Result<(), SpawnError> {
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(future);
Ok(())
} else {
Err(SpawnError::shutdown())
}
}
}
2 changes: 1 addition & 1 deletion futures-task/src/lib.rs
Expand Up @@ -16,7 +16,7 @@ extern crate alloc;
extern crate std;

mod spawn;
pub use crate::spawn::{LocalSpawn, Spawn, SpawnError};
pub use crate::spawn::{BoundLocalSpawn, LocalSpawn, Spawn, SpawnError};

#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
Expand Down
117 changes: 30 additions & 87 deletions futures-task/src/spawn.rs
Expand Up @@ -51,6 +51,22 @@ pub trait LocalSpawn {
}
}

/// The `BoundLocalSpawn` is similar to [`LocalSpawn`], but allows spawning
/// futures that don't implement `Send` and have a lifetime that only needs to
/// exceed that of the associated executor.
pub trait BoundLocalSpawn<'a> {
/// Spawns a future that will be run to completion or until the executor is
/// dropped.
///
/// # Errors
///
/// The executor may be unable to spawn tasks. Spawn errors should
/// represent relatively rare scenarios, such as the executor
/// having been shut down so that it is no longer able to accept
/// tasks.
fn spawn_bound_local_obj(&self, future: LocalFutureObj<'a, ()>) -> Result<(), SpawnError>;
}

/// An error that occurred during spawning.
pub struct SpawnError {
_priv: (),
Expand Down Expand Up @@ -83,17 +99,10 @@ impl SpawnError {
}
}

impl<Sp: ?Sized + Spawn> Spawn for &Sp {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
Sp::spawn_obj(self, future)
}

fn status(&self) -> Result<(), SpawnError> {
Sp::status(self)
}
}

impl<Sp: ?Sized + Spawn> Spawn for &mut Sp {
impl<T, Sp: ?Sized + Spawn> Spawn for T
where
T: std::ops::Deref<Target = Sp>,
{
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
Sp::spawn_obj(self, future)
}
Expand All @@ -103,17 +112,10 @@ impl<Sp: ?Sized + Spawn> Spawn for &mut Sp {
}
}

impl<Sp: ?Sized + LocalSpawn> LocalSpawn for &Sp {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
Sp::spawn_local_obj(self, future)
}

fn status_local(&self) -> Result<(), SpawnError> {
Sp::status_local(self)
}
}

impl<Sp: ?Sized + LocalSpawn> LocalSpawn for &mut Sp {
impl<T, Sp: ?Sized + LocalSpawn> LocalSpawn for T
where
T: std::ops::Deref<Target = Sp>,
{
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
Sp::spawn_local_obj(self, future)
}
Expand All @@ -123,70 +125,11 @@ impl<Sp: ?Sized + LocalSpawn> LocalSpawn for &mut Sp {
}
}

#[cfg(feature = "alloc")]
mod if_alloc {
use super::*;
use alloc::{boxed::Box, rc::Rc};

impl<Sp: ?Sized + Spawn> Spawn for Box<Sp> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
(**self).spawn_obj(future)
}

fn status(&self) -> Result<(), SpawnError> {
(**self).status()
}
}

impl<Sp: ?Sized + LocalSpawn> LocalSpawn for Box<Sp> {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
(**self).spawn_local_obj(future)
}

fn status_local(&self) -> Result<(), SpawnError> {
(**self).status_local()
}
}

impl<Sp: ?Sized + Spawn> Spawn for Rc<Sp> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
(**self).spawn_obj(future)
}

fn status(&self) -> Result<(), SpawnError> {
(**self).status()
}
}

impl<Sp: ?Sized + LocalSpawn> LocalSpawn for Rc<Sp> {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
(**self).spawn_local_obj(future)
}

fn status_local(&self) -> Result<(), SpawnError> {
(**self).status_local()
}
}

#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
impl<Sp: ?Sized + Spawn> Spawn for alloc::sync::Arc<Sp> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
(**self).spawn_obj(future)
}

fn status(&self) -> Result<(), SpawnError> {
(**self).status()
}
}

#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
impl<Sp: ?Sized + LocalSpawn> LocalSpawn for alloc::sync::Arc<Sp> {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
(**self).spawn_local_obj(future)
}

fn status_local(&self) -> Result<(), SpawnError> {
(**self).status_local()
}
impl<'a, T, Sp: ?Sized + BoundLocalSpawn<'a>> BoundLocalSpawn<'a> for T
where
T: std::ops::Deref<Target = Sp>,
{
fn spawn_bound_local_obj(&self, future: LocalFutureObj<'a, ()>) -> Result<(), SpawnError> {
Sp::spawn_bound_local_obj(self, future)
}
}
9 changes: 8 additions & 1 deletion futures-util/src/stream/futures_unordered/mod.rs
Expand Up @@ -17,7 +17,7 @@ use core::sync::atomic::{AtomicBool, AtomicPtr};
use futures_core::future::Future;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
use futures_task::{BoundLocalSpawn, FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};

mod abort;

Expand Down Expand Up @@ -78,6 +78,13 @@ impl LocalSpawn for FuturesUnordered<LocalFutureObj<'_, ()>> {
}
}

impl<'a> BoundLocalSpawn<'a> for FuturesUnordered<LocalFutureObj<'a, ()>> {
fn spawn_bound_local_obj(&self, future_obj: LocalFutureObj<'a, ()>) -> Result<(), SpawnError> {
self.push(future_obj);
Ok(())
}
}

// FuturesUnordered is implemented using two linked lists. One which links all
// futures managed by a `FuturesUnordered` and one that tracks futures that have
// been scheduled for polling. The first linked list allows for thread safe
Expand Down
6 changes: 4 additions & 2 deletions futures-util/src/task/mod.rs
Expand Up @@ -13,7 +13,9 @@
#[doc(no_inline)]
pub use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

pub use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError, UnsafeFutureObj};
pub use futures_task::{
BoundLocalSpawn, FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError, UnsafeFutureObj,
};

pub use futures_task::noop_waker;
pub use futures_task::noop_waker_ref;
Expand All @@ -37,4 +39,4 @@ pub use futures_task::{waker_ref, WakerRef};
pub use futures_core::task::__internal::AtomicWaker;

mod spawn;
pub use self::spawn::{LocalSpawnExt, SpawnExt};
pub use self::spawn::{BoundLocalSpawnExt, LocalSpawnExt, SpawnExt};

0 comments on commit 81bcdac

Please sign in to comment.