Skip to content

Commit

Permalink
runtime: add OwnedTasks (#3909)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Jul 7, 2021
1 parent 51fad06 commit e2589a0
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 405 deletions.
98 changes: 19 additions & 79 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -2,16 +2,14 @@ use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::task::{self, JoinHandle, Schedule, Task};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::sync::notify::Notify;
use crate::util::linked_list::{Link, LinkedList};
use crate::util::{waker_ref, Wake, WakerRef};

use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
Expand Down Expand Up @@ -57,9 +55,6 @@ pub(crate) struct Spawner {
}

struct Tasks {
/// Collection of all active tasks spawned onto this executor.
owned: LinkedList<Task<Arc<Shared>>, <Task<Arc<Shared>> as Link>::Target>,

/// Local run queue.
///
/// Tasks notified from the current thread are pushed into this queue.
Expand All @@ -69,23 +64,23 @@ struct Tasks {
/// A remote scheduler entry.
///
/// These are filled in by remote threads sending instructions to the scheduler.
enum Entry {
enum RemoteMsg {
/// A remote thread wants to spawn a task.
Schedule(task::Notified<Arc<Shared>>),
/// A remote thread wants a task to be released by the scheduler. We only
/// have access to its header.
Release(NonNull<task::Header>),
}

// Safety: Used correctly, the task header is "thread safe". Ultimately the task
// is owned by the current thread executor, for which this instruction is being
// sent.
unsafe impl Send for Entry {}
unsafe impl Send for RemoteMsg {}

/// Scheduler state shared between threads.
struct Shared {
/// Remote run queue. None if the `Runtime` has been dropped.
queue: Mutex<Option<VecDeque<Entry>>>,
queue: Mutex<Option<VecDeque<RemoteMsg>>>,

/// Collection of all active tasks spawned onto this executor.
owned: OwnedTasks<Arc<Shared>>,

/// Unpark the blocked thread.
unpark: Box<dyn Unpark>,
Expand Down Expand Up @@ -125,14 +120,14 @@ impl<P: Park> BasicScheduler<P> {
let spawner = Spawner {
shared: Arc::new(Shared {
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
owned: OwnedTasks::new(),
unpark: unpark as Box<dyn Unpark>,
woken: AtomicBool::new(false),
}),
};

let inner = Mutex::new(Some(Inner {
tasks: Some(Tasks {
owned: LinkedList::new(),
queue: VecDeque::with_capacity(INITIAL_CAPACITY),
}),
spawner: spawner.clone(),
Expand Down Expand Up @@ -227,15 +222,15 @@ impl<P: Park> Inner<P> {
.borrow_mut()
.queue
.pop_front()
.map(Entry::Schedule)
.map(RemoteMsg::Schedule)
})
} else {
context
.tasks
.borrow_mut()
.queue
.pop_front()
.map(Entry::Schedule)
.map(RemoteMsg::Schedule)
.or_else(|| scheduler.spawner.pop())
};

Expand All @@ -251,26 +246,7 @@ impl<P: Park> Inner<P> {
};

match entry {
Entry::Schedule(task) => crate::coop::budget(|| task.run()),
Entry::Release(ptr) => {
// Safety: the task header is only legally provided
// internally in the header, so we know that it is a
// valid (or in particular *allocated*) header that
// is part of the linked list.
unsafe {
let removed = context.tasks.borrow_mut().owned.remove(ptr);

// TODO: This seems like it should hold, because
// there doesn't seem to be an avenue for anyone
// else to fiddle with the owned tasks
// collection *after* a remote thread has marked
// it as released, and at that point, the only
// location at which it can be removed is here
// or in the Drop implementation of the
// scheduler.
debug_assert!(removed.is_some());
}
}
RemoteMsg::Schedule(task) => crate::coop::budget(|| task.run()),
}
}

Expand Down Expand Up @@ -335,14 +311,7 @@ impl<P: Park> Drop for BasicScheduler<P> {
};

enter(&mut inner, |scheduler, context| {
// Loop required here to ensure borrow is dropped between iterations
#[allow(clippy::while_let_loop)]
loop {
let task = match context.tasks.borrow_mut().owned.pop_back() {
Some(task) => task,
None => break,
};

while let Some(task) = context.shared.owned.pop_back() {
task.shutdown();
}

Expand All @@ -358,13 +327,9 @@ impl<P: Park> Drop for BasicScheduler<P> {
if let Some(remote_queue) = remote_queue.take() {
for entry in remote_queue {
match entry {
Entry::Schedule(task) => {
RemoteMsg::Schedule(task) => {
task.shutdown();
}
Entry::Release(..) => {
// Do nothing, each entry in the linked list was *just*
// dropped by the scheduler above.
}
}
}
}
Expand All @@ -375,7 +340,7 @@ impl<P: Park> Drop for BasicScheduler<P> {
// The assert below is unrelated to this mutex.
drop(remote_queue);

assert!(context.tasks.borrow().owned.is_empty());
assert!(context.shared.owned.is_empty());
});
}
}
Expand All @@ -400,7 +365,7 @@ impl Spawner {
handle
}

fn pop(&self) -> Option<Entry> {
fn pop(&self) -> Option<RemoteMsg> {
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
None => None,
Expand Down Expand Up @@ -430,39 +395,14 @@ impl Schedule for Arc<Shared> {
fn bind(task: Task<Self>) -> Arc<Shared> {
CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("scheduler context missing");
cx.tasks.borrow_mut().owned.push_front(task);
cx.shared.owned.push_front(task);
cx.shared.clone()
})
}

fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
CURRENT.with(|maybe_cx| {
let ptr = NonNull::from(task.header());

if let Some(cx) = maybe_cx {
// safety: the task is inserted in the list in `bind`.
unsafe { cx.tasks.borrow_mut().owned.remove(ptr) }
} else {
// By sending an `Entry::Release` to the runtime, we ask the
// runtime to remove this task from the linked list in
// `Tasks::owned`.
//
// If the queue is `None`, then the task was already removed
// from that list in the destructor of `BasicScheduler`. We do
// not do anything in this case for the same reason that
// `Entry::Release` messages are ignored in the remote queue
// drain loop of `BasicScheduler`'s destructor.
if let Some(queue) = self.queue.lock().as_mut() {
queue.push_back(Entry::Release(ptr));
}

self.unpark.unpark();
// Returning `None` here prevents the task plumbing from being
// freed. It is then up to the scheduler through the queue we
// just added to, or its Drop impl to free the task.
None
}
})
// SAFETY: Inserted into the list in bind above.
unsafe { self.owned.remove(task) }
}

fn schedule(&self, task: task::Notified<Self>) {
Expand All @@ -473,7 +413,7 @@ impl Schedule for Arc<Shared> {
_ => {
let mut guard = self.queue.lock();
if let Some(queue) = guard.as_mut() {
queue.push_back(Entry::Schedule(task));
queue.push_back(RemoteMsg::Schedule(task));
drop(guard);
self.unpark.unpark();
} else {
Expand Down
11 changes: 0 additions & 11 deletions tokio/src/runtime/task/core.rs
Expand Up @@ -66,9 +66,6 @@ pub(crate) struct Header {
/// Pointer to next task, used with the injection queue
pub(crate) queue_next: UnsafeCell<Option<NonNull<Header>>>,

/// Pointer to the next task in the transfer stack
pub(super) stack_next: UnsafeCell<Option<NonNull<Header>>>,

/// Table of function pointers for executing actions on the task.
pub(super) vtable: &'static Vtable,

Expand Down Expand Up @@ -104,7 +101,6 @@ impl<T: Future, S: Schedule> Cell<T, S> {
state,
owned: UnsafeCell::new(linked_list::Pointers::new()),
queue_next: UnsafeCell::new(None),
stack_next: UnsafeCell::new(None),
vtable: raw::vtable::<T, S>(),
#[cfg(all(tokio_unstable, feature = "tracing"))]
id,
Expand Down Expand Up @@ -299,13 +295,6 @@ impl<T: Future> CoreStage<T> {

cfg_rt_multi_thread! {
impl Header {
pub(crate) fn shutdown(&self) {
use crate::runtime::task::RawTask;

let task = unsafe { RawTask::from_raw(self.into()) };
task.shutdown();
}

pub(crate) unsafe fn set_next(&self, next: Option<NonNull<Header>>) {
self.queue_next.with_mut(|ptr| *ptr = next);
}
Expand Down
33 changes: 33 additions & 0 deletions tokio/src/runtime/task/list.rs
@@ -0,0 +1,33 @@
use crate::loom::sync::Mutex;
use crate::runtime::task::Task;
use crate::util::linked_list::{Link, LinkedList};

pub(crate) struct OwnedTasks<S: 'static> {
list: Mutex<LinkedList<Task<S>, <Task<S> as Link>::Target>>,
}

impl<S: 'static> OwnedTasks<S> {
pub(crate) fn new() -> Self {
Self {
list: Mutex::new(LinkedList::new()),
}
}

pub(crate) fn push_front(&self, task: Task<S>) {
self.list.lock().push_front(task);
}

pub(crate) fn pop_back(&self) -> Option<Task<S>> {
self.list.lock().pop_back()
}

/// The caller must ensure that if the provided task is stored in a
/// linked list, then it is in this linked list.
pub(crate) unsafe fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
self.list.lock().remove(task.header().into())
}

pub(crate) fn is_empty(&self) -> bool {
self.list.lock().is_empty()
}
}
15 changes: 6 additions & 9 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -13,6 +13,9 @@ mod join;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::join::JoinHandle;

mod list;
pub(super) use self::list::OwnedTasks;

mod raw;
use self::raw::RawTask;

Expand All @@ -21,11 +24,6 @@ use self::state::State;

mod waker;

cfg_rt_multi_thread! {
mod stack;
pub(crate) use self::stack::TransferStack;
}

use crate::future::Future;
use crate::util::linked_list;

Expand Down Expand Up @@ -62,11 +60,10 @@ pub(crate) trait Schedule: Sync + Sized + 'static {
fn bind(task: Task<Self>) -> Self;

/// The task has completed work and is ready to be released. The scheduler
/// is free to drop it whenever.
/// should release it immediately and return it. The task module will batch
/// the ref-dec with setting other options.
///
/// If the scheduler can immediately release the task, it should return
/// it as part of the function. This enables the task module to batch
/// the ref-dec with other options.
/// If the scheduler has already released the task, then None is returned.
fn release(&self, task: &Task<Self>) -> Option<Task<Self>>;

/// Schedule the task
Expand Down

0 comments on commit e2589a0

Please sign in to comment.