Skip to content

Commit

Permalink
runtime: rework binding of new tasks (#3955)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Jul 20, 2021
1 parent 5ae2855 commit 2087f3e
Show file tree
Hide file tree
Showing 20 changed files with 504 additions and 271 deletions.
28 changes: 12 additions & 16 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -311,6 +311,9 @@ impl<P: Park> Drop for BasicScheduler<P> {
};

enter(&mut inner, |scheduler, context| {
// By closing the OwnedTasks, no new tasks can be spawned on it.
context.shared.owned.close();
// Drain the OwnedTasks collection.
while let Some(task) = context.shared.owned.pop_back() {
task.shutdown();
}
Expand Down Expand Up @@ -354,14 +357,18 @@ impl<P: Park> fmt::Debug for BasicScheduler<P> {
// ===== impl Spawner =====

impl Spawner {
/// Spawns a future onto the thread pool
/// Spawns a future onto the basic scheduler
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
self.shared.schedule(task);
let (handle, notified) = self.shared.owned.bind(future, self.shared.clone());

if let Some(notified) = notified {
self.shared.schedule(notified);
}

handle
}

Expand Down Expand Up @@ -392,14 +399,6 @@ impl fmt::Debug for Spawner {
// ===== impl Shared =====

impl Schedule for Arc<Shared> {
fn bind(task: Task<Self>) -> Arc<Shared> {
CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("scheduler context missing");
cx.shared.owned.push_front(task);
cx.shared.clone()
})
}

fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
// SAFETY: Inserted into the list in bind above.
unsafe { self.owned.remove(task) }
Expand All @@ -411,16 +410,13 @@ impl Schedule for Arc<Shared> {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
// If the queue is None, then the runtime has shut down. We
// don't need to do anything with the notification in that case.
let mut guard = self.queue.lock();
if let Some(queue) = guard.as_mut() {
queue.push_back(RemoteMsg::Schedule(task));
drop(guard);
self.unpark.unpark();
} else {
// The runtime has shut down. We drop the new task
// immediately.
drop(guard);
task.shutdown();
}
}
});
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/runtime/blocking/mod.rs
Expand Up @@ -8,7 +8,9 @@ pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};

mod schedule;
mod shutdown;
pub(crate) mod task;
mod task;
pub(crate) use schedule::NoopSchedule;
pub(crate) use task::BlockingTask;

use crate::runtime::Builder;

Expand Down
5 changes: 0 additions & 5 deletions tokio/src/runtime/blocking/schedule.rs
Expand Up @@ -9,11 +9,6 @@ use crate::runtime::task::{self, Task};
pub(crate) struct NoopSchedule;

impl task::Schedule for NoopSchedule {
fn bind(_task: Task<Self>) -> NoopSchedule {
// Do nothing w/ the task
NoopSchedule
}

fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
None
}
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/handle.rs
@@ -1,4 +1,4 @@
use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::blocking::{BlockingTask, NoopSchedule};
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{blocking, context, driver, Spawner};
use crate::util::error::CONTEXT_MISSING_ERROR;
Expand Down Expand Up @@ -213,7 +213,7 @@ impl Handle {
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;

let (task, handle) = task::joinable(fut);
let (task, handle) = task::unowned(fut, NoopSchedule);
let _ = self.blocking_spawner.spawn(task, &self);
handle
}
Expand Down
9 changes: 2 additions & 7 deletions tokio/src/runtime/queue.rs
Expand Up @@ -106,13 +106,8 @@ impl<T> Local<T> {
break tail;
} else if steal != real {
// Concurrently stealing, this will free up capacity, so only
// push the new task onto the inject queue
//
// If the task fails to be pushed on the injection queue, there
// is nothing to be done at this point as the task cannot be a
// newly spawned task. Shutting down this task is handled by the
// worker shutdown process.
let _ = inject.push(task);
// push the task onto the inject queue
inject.push(task);
return;
} else {
// Push the current task and half of the queue into the
Expand Down
32 changes: 2 additions & 30 deletions tokio/src/runtime/task/core.rs
Expand Up @@ -93,7 +93,7 @@ pub(super) enum Stage<T: Future> {
impl<T: Future, S: Schedule> Cell<T, S> {
/// Allocates a new task cell, containing the header, trailer, and core
/// structures.
pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> {
pub(super) fn new(future: T, scheduler: S, state: State) -> Box<Cell<T, S>> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let id = future.id();
Box::new(Cell {
Expand All @@ -107,7 +107,7 @@ impl<T: Future, S: Schedule> Cell<T, S> {
},
core: Core {
scheduler: Scheduler {
scheduler: UnsafeCell::new(None),
scheduler: UnsafeCell::new(Some(scheduler)),
},
stage: CoreStage {
stage: UnsafeCell::new(Stage::Running(future)),
Expand All @@ -125,34 +125,6 @@ impl<S: Schedule> Scheduler<S> {
self.scheduler.with_mut(f)
}

/// Bind a scheduler to the task.
///
/// This only happens on the first poll and must be preceded by a call to
/// `is_bound` to determine if binding is appropriate or not.
///
/// # Safety
///
/// Binding must not be done concurrently since it will mutate the task
/// core through a shared reference.
pub(super) fn bind_scheduler(&self, task: Task<S>) {
// This function may be called concurrently, but the __first__ time it
// is called, the caller has unique access to this field. All subsequent
// concurrent calls will be via the `Waker`, which will "happens after"
// the first poll.
//
// In other words, it is always safe to read the field and it is safe to
// write to the field when it is `None`.
debug_assert!(!self.is_bound());

// Bind the task to the scheduler
let scheduler = S::bind(task);

// Safety: As `scheduler` is not set, this is the first poll
self.scheduler.with_mut(|ptr| unsafe {
*ptr = Some(scheduler);
});
}

/// Returns true if the task is bound to a scheduler.
pub(super) fn is_bound(&self) -> bool {
// Safety: never called concurrently w/ a mutation.
Expand Down
21 changes: 2 additions & 19 deletions tokio/src/runtime/task/harness.rs
Expand Up @@ -254,16 +254,13 @@ where
}

fn transition_to_running(&self) -> TransitionToRunning {
// If this is the first time the task is polled, the task will be bound
// to the scheduler, in which case the task ref count must be
// incremented.
let is_not_bound = !self.scheduler.is_bound();
debug_assert!(self.scheduler.is_bound());

// Transition the task to the running state.
//
// A failure to transition here indicates the task has been cancelled
// while in the run queue pending execution.
let snapshot = match self.header.state.transition_to_running(is_not_bound) {
let snapshot = match self.header.state.transition_to_running() {
Ok(snapshot) => snapshot,
Err(_) => {
// The task was shutdown while in the run queue. At this point,
Expand All @@ -273,20 +270,6 @@ where
}
};

if is_not_bound {
// Ensure the task is bound to a scheduler instance. Since this is
// the first time polling the task, a scheduler instance is pulled
// from the local context and assigned to the task.
//
// The scheduler maintains ownership of the task and responds to
// `wake` calls.
//
// The task reference count has been incremented.
//
// Safety: Since we have unique access to the task so that we can
// safely call `bind_scheduler`.
self.scheduler.bind_scheduler(self.to_task());
}
TransitionToRunning::Ok(snapshot)
}
}
Expand Down
9 changes: 3 additions & 6 deletions tokio/src/runtime/task/inject.rs
Expand Up @@ -75,15 +75,13 @@ impl<T: 'static> Inject<T> {

/// Pushes a value into the queue.
///
/// Returns `Err(task)` if pushing fails due to the queue being shutdown.
/// The caller is expected to call `shutdown()` on the task **if and only
/// if** it is a newly spawned task.
pub(crate) fn push(&self, task: task::Notified<T>) -> Result<(), task::Notified<T>> {
/// This does nothing if the queue is closed.
pub(crate) fn push(&self, task: task::Notified<T>) {
// Acquire queue lock
let mut p = self.pointers.lock();

if p.is_closed {
return Err(task);
return;
}

// safety: only mutated with the lock held
Expand All @@ -102,7 +100,6 @@ impl<T: 'static> Inject<T> {
p.tail = Some(task);

self.len.store(len + 1, Release);
Ok(())
}

/// Pushes several values into the queue.
Expand Down
124 changes: 116 additions & 8 deletions tokio/src/runtime/task/list.rs
@@ -1,33 +1,141 @@
//! This module has containers for storing the tasks spawned on a scheduler. The
//! `OwnedTasks` container is thread-safe but can only store tasks that
//! implement Send. The `LocalOwnedTasks` container is not thread safe, but can
//! store non-Send tasks.
//!
//! The collections can be closed to prevent adding new tasks during shutdown of
//! the scheduler with the collection.

use crate::future::Future;
use crate::loom::sync::Mutex;
use crate::runtime::task::Task;
use crate::runtime::task::{JoinHandle, Notified, Schedule, Task};
use crate::util::linked_list::{Link, LinkedList};

use std::marker::PhantomData;

pub(crate) struct OwnedTasks<S: 'static> {
list: Mutex<LinkedList<Task<S>, <Task<S> as Link>::Target>>,
inner: Mutex<OwnedTasksInner<S>>,
}
struct OwnedTasksInner<S: 'static> {
list: LinkedList<Task<S>, <Task<S> as Link>::Target>,
closed: bool,
}

pub(crate) struct LocalOwnedTasks<S: 'static> {
list: LinkedList<Task<S>, <Task<S> as Link>::Target>,
closed: bool,
_not_send: PhantomData<*const ()>,
}

impl<S: 'static> OwnedTasks<S> {
pub(crate) fn new() -> Self {
Self {
list: Mutex::new(LinkedList::new()),
inner: Mutex::new(OwnedTasksInner {
list: LinkedList::new(),
closed: false,
}),
}
}

pub(crate) fn push_front(&self, task: Task<S>) {
self.list.lock().push_front(task);
/// Bind the provided task to this OwnedTasks instance. This fails if the
/// OwnedTasks has been closed.
pub(crate) fn bind<T>(
&self,
task: T,
scheduler: S,
) -> (JoinHandle<T::Output>, Option<Notified<S>>)
where
S: Schedule,
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (task, notified, join) = super::new_task(task, scheduler);

let mut lock = self.inner.lock();
if lock.closed {
drop(lock);
drop(task);
notified.shutdown();
(join, None)
} else {
lock.list.push_front(task);
(join, Some(notified))
}
}

pub(crate) fn pop_back(&self) -> Option<Task<S>> {
self.list.lock().pop_back()
self.inner.lock().list.pop_back()
}

/// The caller must ensure that if the provided task is stored in a
/// linked list, then it is in this linked list.
pub(crate) unsafe fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
self.list.lock().remove(task.header().into())
self.inner.lock().list.remove(task.header().into())
}

pub(crate) fn is_empty(&self) -> bool {
self.inner.lock().list.is_empty()
}

#[cfg(feature = "rt-multi-thread")]
pub(crate) fn is_closed(&self) -> bool {
self.inner.lock().closed
}

/// Close the OwnedTasks. This prevents adding new tasks to the collection.
pub(crate) fn close(&self) {
self.inner.lock().closed = true;
}
}

impl<S: 'static> LocalOwnedTasks<S> {
pub(crate) fn new() -> Self {
Self {
list: LinkedList::new(),
closed: false,
_not_send: PhantomData,
}
}

pub(crate) fn bind<T>(
&mut self,
task: T,
scheduler: S,
) -> (JoinHandle<T::Output>, Option<Notified<S>>)
where
S: Schedule,
T: Future + 'static,
T::Output: 'static,
{
let (task, notified, join) = super::new_task(task, scheduler);

if self.closed {
drop(task);
notified.shutdown();
(join, None)
} else {
self.list.push_front(task);
(join, Some(notified))
}
}

pub(crate) fn pop_back(&mut self) -> Option<Task<S>> {
self.list.pop_back()
}

/// The caller must ensure that if the provided task is stored in a
/// linked list, then it is in this linked list.
pub(crate) unsafe fn remove(&mut self, task: &Task<S>) -> Option<Task<S>> {
self.list.remove(task.header().into())
}

pub(crate) fn is_empty(&self) -> bool {
self.list.lock().is_empty()
self.list.is_empty()
}

/// Close the LocalOwnedTasks. This prevents adding new tasks to the
/// collection.
pub(crate) fn close(&mut self) {
self.closed = true;
}
}

1 comment on commit 2087f3e

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'sync_mpsc'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 2087f3e Previous: 2520048 Ratio
send_large 64390 ns/iter (± 6099) 29702 ns/iter (± 338) 2.17

This comment was automatically generated by workflow using github-action-benchmark.

CC: @tokio-rs/maintainers

Please sign in to comment.