Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework binding of new tasks #3955

Merged
merged 15 commits into from Jul 20, 2021
28 changes: 12 additions & 16 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -311,6 +311,9 @@ impl<P: Park> Drop for BasicScheduler<P> {
};

enter(&mut inner, |scheduler, context| {
// By closing the OwnedTasks, no new tasks can be spawned on it.
context.shared.owned.close();
// Drain the OwnedTasks collection.
while let Some(task) = context.shared.owned.pop_back() {
task.shutdown();
}
Expand Down Expand Up @@ -354,14 +357,18 @@ impl<P: Park> fmt::Debug for BasicScheduler<P> {
// ===== impl Spawner =====

impl Spawner {
/// Spawns a future onto the thread pool
/// Spawns a future onto the basic scheduler
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
self.shared.schedule(task);
let (handle, notified) = self.shared.owned.bind(future, self.shared.clone());

if let Some(notified) = notified {
self.shared.schedule(notified);
}

handle
}

Expand Down Expand Up @@ -392,14 +399,6 @@ impl fmt::Debug for Spawner {
// ===== impl Shared =====

impl Schedule for Arc<Shared> {
fn bind(task: Task<Self>) -> Arc<Shared> {
CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("scheduler context missing");
cx.shared.owned.push_front(task);
cx.shared.clone()
})
}

fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
// SAFETY: Inserted into the list in bind above.
unsafe { self.owned.remove(task) }
Expand All @@ -411,16 +410,13 @@ impl Schedule for Arc<Shared> {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
// If the queue is None, then the runtime has shut down. We
// don't need to do anything with the notification in that case.
let mut guard = self.queue.lock();
if let Some(queue) = guard.as_mut() {
queue.push_back(RemoteMsg::Schedule(task));
drop(guard);
self.unpark.unpark();
} else {
// The runtime has shut down. We drop the new task
// immediately.
drop(guard);
task.shutdown();
}
}
});
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/runtime/blocking/mod.rs
Expand Up @@ -8,7 +8,9 @@ pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};

mod schedule;
mod shutdown;
pub(crate) mod task;
mod task;
pub(crate) use schedule::NoopSchedule;
pub(crate) use task::BlockingTask;

use crate::runtime::Builder;

Expand Down
5 changes: 0 additions & 5 deletions tokio/src/runtime/blocking/schedule.rs
Expand Up @@ -9,11 +9,6 @@ use crate::runtime::task::{self, Task};
pub(crate) struct NoopSchedule;

impl task::Schedule for NoopSchedule {
fn bind(_task: Task<Self>) -> NoopSchedule {
// Do nothing w/ the task
NoopSchedule
}

fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
None
}
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/handle.rs
@@ -1,4 +1,4 @@
use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::blocking::{BlockingTask, NoopSchedule};
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{blocking, context, driver, Spawner};
use crate::util::error::CONTEXT_MISSING_ERROR;
Expand Down Expand Up @@ -213,7 +213,7 @@ impl Handle {
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;

let (task, handle) = task::joinable(fut);
let (task, handle) = task::unowned(fut, NoopSchedule);
let _ = self.blocking_spawner.spawn(task, &self);
handle
}
Expand Down
9 changes: 2 additions & 7 deletions tokio/src/runtime/queue.rs
Expand Up @@ -106,13 +106,8 @@ impl<T> Local<T> {
break tail;
} else if steal != real {
// Concurrently stealing, this will free up capacity, so only
// push the new task onto the inject queue
//
// If the task fails to be pushed on the injection queue, there
// is nothing to be done at this point as the task cannot be a
// newly spawned task. Shutting down this task is handled by the
// worker shutdown process.
let _ = inject.push(task);
// push the task onto the inject queue
inject.push(task);
return;
} else {
// Push the current task and half of the queue into the
Expand Down
32 changes: 2 additions & 30 deletions tokio/src/runtime/task/core.rs
Expand Up @@ -93,7 +93,7 @@ pub(super) enum Stage<T: Future> {
impl<T: Future, S: Schedule> Cell<T, S> {
/// Allocates a new task cell, containing the header, trailer, and core
/// structures.
pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> {
pub(super) fn new(future: T, scheduler: S, state: State) -> Box<Cell<T, S>> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let id = future.id();
Box::new(Cell {
Expand All @@ -107,7 +107,7 @@ impl<T: Future, S: Schedule> Cell<T, S> {
},
core: Core {
scheduler: Scheduler {
scheduler: UnsafeCell::new(None),
scheduler: UnsafeCell::new(Some(scheduler)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ever None anymore?

If it is never None, you don't need to remove Option in this PR but we should clean it up in a follow up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally did not include that in this PR because the diff is already really long, and it wasn't that simple to remove the None case.

},
stage: CoreStage {
stage: UnsafeCell::new(Stage::Running(future)),
Expand All @@ -125,34 +125,6 @@ impl<S: Schedule> Scheduler<S> {
self.scheduler.with_mut(f)
}

/// Bind a scheduler to the task.
///
/// This only happens on the first poll and must be preceded by a call to
/// `is_bound` to determine if binding is appropriate or not.
///
/// # Safety
///
/// Binding must not be done concurrently since it will mutate the task
/// core through a shared reference.
pub(super) fn bind_scheduler(&self, task: Task<S>) {
// This function may be called concurrently, but the __first__ time it
// is called, the caller has unique access to this field. All subsequent
// concurrent calls will be via the `Waker`, which will "happens after"
// the first poll.
//
// In other words, it is always safe to read the field and it is safe to
// write to the field when it is `None`.
debug_assert!(!self.is_bound());

// Bind the task to the scheduler
let scheduler = S::bind(task);

// Safety: As `scheduler` is not set, this is the first poll
self.scheduler.with_mut(|ptr| unsafe {
*ptr = Some(scheduler);
});
}

/// Returns true if the task is bound to a scheduler.
pub(super) fn is_bound(&self) -> bool {
// Safety: never called concurrently w/ a mutation.
Expand Down
21 changes: 2 additions & 19 deletions tokio/src/runtime/task/harness.rs
Expand Up @@ -254,16 +254,13 @@ where
}

fn transition_to_running(&self) -> TransitionToRunning {
// If this is the first time the task is polled, the task will be bound
// to the scheduler, in which case the task ref count must be
// incremented.
let is_not_bound = !self.scheduler.is_bound();
debug_assert!(self.scheduler.is_bound());

// Transition the task to the running state.
//
// A failure to transition here indicates the task has been cancelled
// while in the run queue pending execution.
let snapshot = match self.header.state.transition_to_running(is_not_bound) {
let snapshot = match self.header.state.transition_to_running() {
Ok(snapshot) => snapshot,
Err(_) => {
// The task was shutdown while in the run queue. At this point,
Expand All @@ -273,20 +270,6 @@ where
}
};

if is_not_bound {
// Ensure the task is bound to a scheduler instance. Since this is
// the first time polling the task, a scheduler instance is pulled
// from the local context and assigned to the task.
//
// The scheduler maintains ownership of the task and responds to
// `wake` calls.
//
// The task reference count has been incremented.
//
// Safety: Since we have unique access to the task so that we can
// safely call `bind_scheduler`.
self.scheduler.bind_scheduler(self.to_task());
}
TransitionToRunning::Ok(snapshot)
}
}
Expand Down
9 changes: 3 additions & 6 deletions tokio/src/runtime/task/inject.rs
Expand Up @@ -75,15 +75,13 @@ impl<T: 'static> Inject<T> {

/// Pushes a value into the queue.
///
/// Returns `Err(task)` if pushing fails due to the queue being shutdown.
/// The caller is expected to call `shutdown()` on the task **if and only
/// if** it is a newly spawned task.
pub(crate) fn push(&self, task: task::Notified<T>) -> Result<(), task::Notified<T>> {
/// This does nothing if the queue is closed.
pub(crate) fn push(&self, task: task::Notified<T>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the Result return not needed anymore?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I know, but I would like you to confirm it :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because the pushed task may be a new task, and if the push fails for a new task, the caller needs to know so it can call shutdown on the task. With this PR, that's no longer necessary as binding the new task happens before pushing the notification, and if binding the task succeeded, then the runtime has been given responsibility for cleaning the new task up, even if submitting a notification for the task fails.

It is what I described with the following in the original PR text:

The PR also introduces a conceptual change for the meaning of the Notified type — that type is now "just" a notification, and someone who holds a Notified is never responsible for cleaning up the task.

// Acquire queue lock
let mut p = self.pointers.lock();

if p.is_closed {
return Err(task);
return;
}

// safety: only mutated with the lock held
Expand All @@ -102,7 +100,6 @@ impl<T: 'static> Inject<T> {
p.tail = Some(task);

self.len.store(len + 1, Release);
Ok(())
}

/// Pushes several values into the queue.
Expand Down
124 changes: 116 additions & 8 deletions tokio/src/runtime/task/list.rs
@@ -1,33 +1,141 @@
//! This module has containers for storing the tasks spawned on a scheduler. The
//! `OwnedTasks` container is thread-safe but can only store tasks that
//! implement Send. The `LocalOwnedTasks` container is not thread safe, but can
//! store non-Send tasks.
//!
//! The collections can be closed to prevent adding new tasks during shutdown of
//! the scheduler with the collection.

use crate::future::Future;
use crate::loom::sync::Mutex;
use crate::runtime::task::Task;
use crate::runtime::task::{JoinHandle, Notified, Schedule, Task};
use crate::util::linked_list::{Link, LinkedList};

use std::marker::PhantomData;

pub(crate) struct OwnedTasks<S: 'static> {
list: Mutex<LinkedList<Task<S>, <Task<S> as Link>::Target>>,
inner: Mutex<OwnedTasksInner<S>>,
}
struct OwnedTasksInner<S: 'static> {
list: LinkedList<Task<S>, <Task<S> as Link>::Target>,
closed: bool,
}

pub(crate) struct LocalOwnedTasks<S: 'static> {
list: LinkedList<Task<S>, <Task<S> as Link>::Target>,
closed: bool,
_not_send: PhantomData<*const ()>,
}

impl<S: 'static> OwnedTasks<S> {
pub(crate) fn new() -> Self {
Self {
list: Mutex::new(LinkedList::new()),
inner: Mutex::new(OwnedTasksInner {
list: LinkedList::new(),
closed: false,
}),
}
}

pub(crate) fn push_front(&self, task: Task<S>) {
self.list.lock().push_front(task);
/// Bind the provided task to this OwnedTasks instance. This fails if the
/// OwnedTasks has been closed.
pub(crate) fn bind<T>(
&self,
task: T,
scheduler: S,
) -> (JoinHandle<T::Output>, Option<Notified<S>>)
where
S: Schedule,
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (task, notified, join) = super::new_task(task, scheduler);

let mut lock = self.inner.lock();
if lock.closed {
drop(lock);
drop(task);
notified.shutdown();
(join, None)
} else {
lock.list.push_front(task);
(join, Some(notified))
}
}

pub(crate) fn pop_back(&self) -> Option<Task<S>> {
self.list.lock().pop_back()
self.inner.lock().list.pop_back()
}

/// The caller must ensure that if the provided task is stored in a
/// linked list, then it is in this linked list.
pub(crate) unsafe fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
self.list.lock().remove(task.header().into())
self.inner.lock().list.remove(task.header().into())
}

pub(crate) fn is_empty(&self) -> bool {
self.inner.lock().list.is_empty()
}

#[cfg(feature = "rt-multi-thread")]
pub(crate) fn is_closed(&self) -> bool {
self.inner.lock().closed
}

/// Close the OwnedTasks. This prevents adding new tasks to the collection.
pub(crate) fn close(&self) {
self.inner.lock().closed = true;
}
}

impl<S: 'static> LocalOwnedTasks<S> {
pub(crate) fn new() -> Self {
Self {
list: LinkedList::new(),
closed: false,
_not_send: PhantomData,
}
}

pub(crate) fn bind<T>(
&mut self,
task: T,
scheduler: S,
) -> (JoinHandle<T::Output>, Option<Notified<S>>)
where
S: Schedule,
T: Future + 'static,
T::Output: 'static,
{
let (task, notified, join) = super::new_task(task, scheduler);

if self.closed {
drop(task);
notified.shutdown();
(join, None)
} else {
self.list.push_front(task);
(join, Some(notified))
}
}

pub(crate) fn pop_back(&mut self) -> Option<Task<S>> {
self.list.pop_back()
}

/// The caller must ensure that if the provided task is stored in a
/// linked list, then it is in this linked list.
pub(crate) unsafe fn remove(&mut self, task: &Task<S>) -> Option<Task<S>> {
self.list.remove(task.header().into())
}

pub(crate) fn is_empty(&self) -> bool {
self.list.lock().is_empty()
self.list.is_empty()
}

/// Close the LocalOwnedTasks. This prevents adding new tasks to the
/// collection.
pub(crate) fn close(&mut self) {
self.closed = true;
}
}