Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OwnedTasks #3909

Merged
merged 8 commits into from Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
98 changes: 19 additions & 79 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -2,16 +2,14 @@ use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::task::{self, JoinHandle, Schedule, Task};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::sync::notify::Notify;
use crate::util::linked_list::{Link, LinkedList};
use crate::util::{waker_ref, Wake, WakerRef};

use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
Expand Down Expand Up @@ -57,9 +55,6 @@ pub(crate) struct Spawner {
}

struct Tasks {
/// Collection of all active tasks spawned onto this executor.
owned: LinkedList<Task<Arc<Shared>>, <Task<Arc<Shared>> as Link>::Target>,

/// Local run queue.
///
/// Tasks notified from the current thread are pushed into this queue.
Expand All @@ -69,23 +64,23 @@ struct Tasks {
/// A remote scheduler entry.
///
/// These are filled in by remote threads sending instructions to the scheduler.
enum Entry {
enum RemoteMsg {
/// A remote thread wants to spawn a task.
Schedule(task::Notified<Arc<Shared>>),
Comment on lines +67 to 69
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was the intention behind leaving this an enum just to keep the diff smaller? it seems like it could be changed to a Schedule struct, and just making the run queue a VecDeque<Schedule>...

/// A remote thread wants a task to be released by the scheduler. We only
/// have access to its header.
Release(NonNull<task::Header>),
}

// Safety: Used correctly, the task header is "thread safe". Ultimately the task
// is owned by the current thread executor, for which this instruction is being
// sent.
unsafe impl Send for Entry {}
unsafe impl Send for RemoteMsg {}

/// Scheduler state shared between threads.
struct Shared {
/// Remote run queue. None if the `Runtime` has been dropped.
queue: Mutex<Option<VecDeque<Entry>>>,
queue: Mutex<Option<VecDeque<RemoteMsg>>>,

/// Collection of all active tasks spawned onto this executor.
owned: OwnedTasks<Arc<Shared>>,

/// Unpark the blocked thread.
unpark: Box<dyn Unpark>,
Expand Down Expand Up @@ -125,14 +120,14 @@ impl<P: Park> BasicScheduler<P> {
let spawner = Spawner {
shared: Arc::new(Shared {
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
owned: OwnedTasks::new(),
unpark: unpark as Box<dyn Unpark>,
woken: AtomicBool::new(false),
}),
};

let inner = Mutex::new(Some(Inner {
tasks: Some(Tasks {
owned: LinkedList::new(),
queue: VecDeque::with_capacity(INITIAL_CAPACITY),
}),
spawner: spawner.clone(),
Expand Down Expand Up @@ -227,15 +222,15 @@ impl<P: Park> Inner<P> {
.borrow_mut()
.queue
.pop_front()
.map(Entry::Schedule)
.map(RemoteMsg::Schedule)
})
} else {
context
.tasks
.borrow_mut()
.queue
.pop_front()
.map(Entry::Schedule)
.map(RemoteMsg::Schedule)
.or_else(|| scheduler.spawner.pop())
};

Expand All @@ -251,26 +246,7 @@ impl<P: Park> Inner<P> {
};

match entry {
Entry::Schedule(task) => crate::coop::budget(|| task.run()),
Entry::Release(ptr) => {
// Safety: the task header is only legally provided
// internally in the header, so we know that it is a
// valid (or in particular *allocated*) header that
// is part of the linked list.
unsafe {
let removed = context.tasks.borrow_mut().owned.remove(ptr);

// TODO: This seems like it should hold, because
// there doesn't seem to be an avenue for anyone
// else to fiddle with the owned tasks
// collection *after* a remote thread has marked
// it as released, and at that point, the only
// location at which it can be removed is here
// or in the Drop implementation of the
// scheduler.
debug_assert!(removed.is_some());
}
}
RemoteMsg::Schedule(task) => crate::coop::budget(|| task.run()),
}
}

Expand Down Expand Up @@ -335,14 +311,7 @@ impl<P: Park> Drop for BasicScheduler<P> {
};

enter(&mut inner, |scheduler, context| {
// Loop required here to ensure borrow is dropped between iterations
#[allow(clippy::while_let_loop)]
loop {
let task = match context.tasks.borrow_mut().owned.pop_back() {
Some(task) => task,
None => break,
};

while let Some(task) = context.shared.owned.pop_back() {
task.shutdown();
}

Expand All @@ -358,13 +327,9 @@ impl<P: Park> Drop for BasicScheduler<P> {
if let Some(remote_queue) = remote_queue.take() {
for entry in remote_queue {
match entry {
Entry::Schedule(task) => {
RemoteMsg::Schedule(task) => {
task.shutdown();
}
Entry::Release(..) => {
// Do nothing, each entry in the linked list was *just*
// dropped by the scheduler above.
}
}
}
}
Expand All @@ -375,7 +340,7 @@ impl<P: Park> Drop for BasicScheduler<P> {
// The assert below is unrelated to this mutex.
drop(remote_queue);

assert!(context.tasks.borrow().owned.is_empty());
assert!(context.shared.owned.is_empty());
});
}
}
Expand All @@ -400,7 +365,7 @@ impl Spawner {
handle
}

fn pop(&self) -> Option<Entry> {
fn pop(&self) -> Option<RemoteMsg> {
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
None => None,
Expand Down Expand Up @@ -430,39 +395,14 @@ impl Schedule for Arc<Shared> {
fn bind(task: Task<Self>) -> Arc<Shared> {
CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("scheduler context missing");
cx.tasks.borrow_mut().owned.push_front(task);
cx.shared.owned.push_front(task);
cx.shared.clone()
})
}

fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
CURRENT.with(|maybe_cx| {
let ptr = NonNull::from(task.header());

if let Some(cx) = maybe_cx {
// safety: the task is inserted in the list in `bind`.
unsafe { cx.tasks.borrow_mut().owned.remove(ptr) }
} else {
// By sending an `Entry::Release` to the runtime, we ask the
// runtime to remove this task from the linked list in
// `Tasks::owned`.
//
// If the queue is `None`, then the task was already removed
// from that list in the destructor of `BasicScheduler`. We do
// not do anything in this case for the same reason that
// `Entry::Release` messages are ignored in the remote queue
// drain loop of `BasicScheduler`'s destructor.
if let Some(queue) = self.queue.lock().as_mut() {
queue.push_back(Entry::Release(ptr));
}

self.unpark.unpark();
// Returning `None` here prevents the task plumbing from being
// freed. It is then up to the scheduler through the queue we
// just added to, or its Drop impl to free the task.
None
}
})
// SAFETY: Inserted into the list in bind above.
unsafe { self.owned.remove(task) }
}

fn schedule(&self, task: task::Notified<Self>) {
Expand All @@ -473,7 +413,7 @@ impl Schedule for Arc<Shared> {
_ => {
let mut guard = self.queue.lock();
if let Some(queue) = guard.as_mut() {
queue.push_back(Entry::Schedule(task));
queue.push_back(RemoteMsg::Schedule(task));
drop(guard);
self.unpark.unpark();
} else {
Expand Down
11 changes: 0 additions & 11 deletions tokio/src/runtime/task/core.rs
Expand Up @@ -66,9 +66,6 @@ pub(crate) struct Header {
/// Pointer to next task, used with the injection queue
pub(crate) queue_next: UnsafeCell<Option<NonNull<Header>>>,

/// Pointer to the next task in the transfer stack
pub(super) stack_next: UnsafeCell<Option<NonNull<Header>>>,

/// Table of function pointers for executing actions on the task.
pub(super) vtable: &'static Vtable,

Expand Down Expand Up @@ -104,7 +101,6 @@ impl<T: Future, S: Schedule> Cell<T, S> {
state,
owned: UnsafeCell::new(linked_list::Pointers::new()),
queue_next: UnsafeCell::new(None),
stack_next: UnsafeCell::new(None),
vtable: raw::vtable::<T, S>(),
#[cfg(all(tokio_unstable, feature = "tracing"))]
id,
Expand Down Expand Up @@ -299,13 +295,6 @@ impl<T: Future> CoreStage<T> {

cfg_rt_multi_thread! {
impl Header {
pub(crate) fn shutdown(&self) {
use crate::runtime::task::RawTask;

let task = unsafe { RawTask::from_raw(self.into()) };
task.shutdown();
}

pub(crate) unsafe fn set_next(&self, next: Option<NonNull<Header>>) {
self.queue_next.with_mut(|ptr| *ptr = next);
}
Expand Down
33 changes: 33 additions & 0 deletions tokio/src/runtime/task/list.rs
@@ -0,0 +1,33 @@
use crate::loom::sync::Mutex;
use crate::runtime::task::Task;
use crate::util::linked_list::{Link, LinkedList};

pub(crate) struct OwnedTasks<S: 'static> {
list: Mutex<LinkedList<Task<S>, <Task<S> as Link>::Target>>,
}

impl<S: 'static> OwnedTasks<S> {
pub(crate) fn new() -> Self {
Self {
list: Mutex::new(LinkedList::new()),
}
}

pub(crate) fn push_front(&self, task: Task<S>) {
self.list.lock().push_front(task);
}

pub(crate) fn pop_back(&self) -> Option<Task<S>> {
self.list.lock().pop_back()
}

/// The caller must ensure that if the provided task is stored in a
/// linked list, then it is in this linked list.
pub(crate) unsafe fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
self.list.lock().remove(task.header().into())
}
Comment on lines +24 to +28
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had discussed introducing a field in the task header to remember which OwnedTasks it is in to make these operations safe. However this seems to be somewhat difficult, e.g. if you concurrently insert it into two OwnedTasks structures, then you have a race condition on the field for remembering the container.

Copy link
Member

@carllerche carllerche Jul 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, which would be solved if the only way to get a Task structure is via the OwnedTasks value:

owned_tasks.insert(async { ... }) -> Task<_>;

By doing this, the pointer to the OwnedTask in the task header is set on creation and never changed.

This can be done in a follow-up API though, we can keep remove unsafe for now.


pub(crate) fn is_empty(&self) -> bool {
self.list.lock().is_empty()
}
}
15 changes: 6 additions & 9 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -13,6 +13,9 @@ mod join;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::join::JoinHandle;

mod list;
pub(super) use self::list::OwnedTasks;

mod raw;
use self::raw::RawTask;

Expand All @@ -21,11 +24,6 @@ use self::state::State;

mod waker;

cfg_rt_multi_thread! {
mod stack;
pub(crate) use self::stack::TransferStack;
}

use crate::future::Future;
use crate::util::linked_list;

Expand Down Expand Up @@ -62,11 +60,10 @@ pub(crate) trait Schedule: Sync + Sized + 'static {
fn bind(task: Task<Self>) -> Self;

/// The task has completed work and is ready to be released. The scheduler
/// is free to drop it whenever.
/// should release it immediately and return it. The task module will batch
/// the ref-dec with setting other options.
///
/// If the scheduler can immediately release the task, it should return
/// it as part of the function. This enables the task module to batch
/// the ref-dec with other options.
/// If the scheduler has already released the task, then None is returned.
fn release(&self, task: &Task<Self>) -> Option<Task<Self>>;

/// Schedule the task
Expand Down