Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make scheduler non-optional #3980

Merged
merged 1 commit into from Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 3 additions & 67 deletions tokio/src/runtime/task/core.rs
Expand Up @@ -13,7 +13,7 @@ use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
use crate::runtime::task::{Notified, Schedule, Task};
use crate::runtime::task::Schedule;
use crate::util::linked_list;

use std::pin::Pin;
Expand All @@ -36,10 +36,6 @@ pub(super) struct Cell<T: Future, S> {
pub(super) trailer: Trailer,
}

pub(super) struct Scheduler<S> {
scheduler: UnsafeCell<Option<S>>,
}

pub(super) struct CoreStage<T: Future> {
stage: UnsafeCell<Stage<T>>,
}
Expand All @@ -49,7 +45,7 @@ pub(super) struct CoreStage<T: Future> {
/// Holds the future or output, depending on the stage of execution.
pub(super) struct Core<T: Future, S> {
/// Scheduler used to drive this future
pub(super) scheduler: Scheduler<S>,
pub(super) scheduler: S,

/// Either the future or the output
pub(super) stage: CoreStage<T>,
Expand Down Expand Up @@ -106,9 +102,7 @@ impl<T: Future, S: Schedule> Cell<T, S> {
id,
},
core: Core {
scheduler: Scheduler {
scheduler: UnsafeCell::new(Some(scheduler)),
},
scheduler,
stage: CoreStage {
stage: UnsafeCell::new(Stage::Running(future)),
},
Expand All @@ -120,64 +114,6 @@ impl<T: Future, S: Schedule> Cell<T, S> {
}
}

impl<S: Schedule> Scheduler<S> {
pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Option<S>) -> R) -> R {
self.scheduler.with_mut(f)
}

/// Returns true if the task is bound to a scheduler.
pub(super) fn is_bound(&self) -> bool {
// Safety: never called concurrently w/ a mutation.
self.scheduler.with(|ptr| unsafe { (*ptr).is_some() })
}

/// Schedule the future for execution
pub(super) fn schedule(&self, task: Notified<S>) {
self.scheduler.with(|ptr| {
// Safety: Can only be called after initial `poll`, which is the
// only time the field is mutated.
match unsafe { &*ptr } {
Some(scheduler) => scheduler.schedule(task),
None => panic!("no scheduler set"),
}
});
}

/// Schedule the future for execution in the near future, yielding the
/// thread to other tasks.
pub(super) fn yield_now(&self, task: Notified<S>) {
self.scheduler.with(|ptr| {
// Safety: Can only be called after initial `poll`, which is the
// only time the field is mutated.
match unsafe { &*ptr } {
Some(scheduler) => scheduler.yield_now(task),
None => panic!("no scheduler set"),
}
});
}

/// Release the task
///
/// If the `Scheduler` implementation is able to, it returns the `Task`
/// handle immediately. The caller of this function will batch a ref-dec
/// with a state change.
pub(super) fn release(&self, task: Task<S>) -> Option<Task<S>> {
use std::mem::ManuallyDrop;

let task = ManuallyDrop::new(task);

self.scheduler.with(|ptr| {
// Safety: Can only be called after initial `poll`, which is the
// only time the field is mutated.
match unsafe { &*ptr } {
Some(scheduler) => scheduler.release(&*task),
// Task was never polled
None => None,
}
})
}
}

impl<T: Future> CoreStage<T> {
pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R {
self.stage.with_mut(f)
Expand Down
21 changes: 9 additions & 12 deletions tokio/src/runtime/task/harness.rs
@@ -1,5 +1,5 @@
use crate::future::Future;
use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Scheduler, Trailer};
use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Trailer};
use crate::runtime::task::state::Snapshot;
use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{JoinError, Notified, Schedule, Task};
Expand Down Expand Up @@ -95,7 +95,6 @@ where

// Check causality
self.core().stage.with_mut(drop);
self.core().scheduler.with_mut(drop);

unsafe {
drop(Box::from_raw(self.cell.as_ptr()));
Expand Down Expand Up @@ -219,7 +218,7 @@ enum TransitionToRunning {

struct SchedulerView<'a, S> {
header: &'a Header,
scheduler: &'a Scheduler<S>,
scheduler: &'a S,
}

impl<'a, S> SchedulerView<'a, S>
Expand All @@ -233,17 +232,17 @@ where

/// Returns true if the task should be deallocated.
fn transition_to_terminal(&self, is_join_interested: bool) -> bool {
let ref_dec = if self.scheduler.is_bound() {
if let Some(task) = self.scheduler.release(self.to_task()) {
mem::forget(task);
true
} else {
false
}
let me = self.to_task();

let ref_dec = if let Some(task) = self.scheduler.release(&me) {
mem::forget(task);
true
} else {
false
};

mem::forget(me);

// This might deallocate
let snapshot = self
.header
Expand All @@ -254,8 +253,6 @@ where
}

fn transition_to_running(&self) -> TransitionToRunning {
debug_assert!(self.scheduler.is_bound());

// Transition the task to the running state.
//
// A failure to transition here indicates the task has been cancelled
Expand Down