Skip to content

Commit

Permalink
rt: reduce duplicated code in task harness (#3314)
Browse files Browse the repository at this point in the history
Task spawning is a common operation that results in a lot of instantiations
of the task code. Reducing the amount of generated code should lead to
faster compile times overall.

This patch extracts code that does not depend on a type parameter into
functions with fewer or no type parameters at all. Reducing the amount of
duplicated code.
  • Loading branch information
Markus Westerlind committed Jan 14, 2021
1 parent 30b4a74 commit 204603b
Show file tree
Hide file tree
Showing 2 changed files with 378 additions and 267 deletions.
152 changes: 94 additions & 58 deletions tokio/src/runtime/task/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
use crate::loom::cell::UnsafeCell;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{Notified, Schedule, Task};
use crate::util::linked_list;

Expand All @@ -37,15 +36,23 @@ pub(super) struct Cell<T: Future, S> {
pub(super) trailer: Trailer,
}

pub(super) struct Scheduler<S> {
scheduler: UnsafeCell<Option<S>>,
}

pub(super) struct CoreStage<T: Future> {
stage: UnsafeCell<Stage<T>>,
}

/// The core of the task.
///
/// Holds the future or output, depending on the stage of execution.
pub(super) struct Core<T: Future, S> {
/// Scheduler used to drive this future
pub(super) scheduler: UnsafeCell<Option<S>>,
pub(super) scheduler: Scheduler<S>,

/// Either the future or the output
pub(super) stage: UnsafeCell<Stage<T>>,
pub(super) stage: CoreStage<T>,
}

/// Crate public as this is also needed by the pool.
Expand Down Expand Up @@ -95,8 +102,12 @@ impl<T: Future, S: Schedule> Cell<T, S> {
vtable: raw::vtable::<T, S>(),
},
core: Core {
scheduler: UnsafeCell::new(None),
stage: UnsafeCell::new(Stage::Running(future)),
scheduler: Scheduler {
scheduler: UnsafeCell::new(None),
},
stage: CoreStage {
stage: UnsafeCell::new(Stage::Running(future)),
},
},
trailer: Trailer {
waker: UnsafeCell::new(None),
Expand All @@ -105,7 +116,11 @@ impl<T: Future, S: Schedule> Cell<T, S> {
}
}

impl<T: Future, S: Schedule> Core<T, S> {
impl<S: Schedule> Scheduler<S> {
pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Option<S>) -> R) -> R {
self.scheduler.with_mut(f)
}

/// Bind a scheduler to the task.
///
/// This only happens on the first poll and must be preceeded by a call to
Expand Down Expand Up @@ -140,6 +155,58 @@ impl<T: Future, S: Schedule> Core<T, S> {
self.scheduler.with(|ptr| unsafe { (*ptr).is_some() })
}

/// Schedule the future for execution
pub(super) fn schedule(&self, task: Notified<S>) {
self.scheduler.with(|ptr| {
// Safety: Can only be called after initial `poll`, which is the
// only time the field is mutated.
match unsafe { &*ptr } {
Some(scheduler) => scheduler.schedule(task),
None => panic!("no scheduler set"),
}
});
}

/// Schedule the future for execution in the near future, yielding the
/// thread to other tasks.
pub(super) fn yield_now(&self, task: Notified<S>) {
self.scheduler.with(|ptr| {
// Safety: Can only be called after initial `poll`, which is the
// only time the field is mutated.
match unsafe { &*ptr } {
Some(scheduler) => scheduler.yield_now(task),
None => panic!("no scheduler set"),
}
});
}

/// Release the task
///
/// If the `Scheduler` implementation is able to, it returns the `Task`
/// handle immediately. The caller of this function will batch a ref-dec
/// with a state change.
pub(super) fn release(&self, task: Task<S>) -> Option<Task<S>> {
use std::mem::ManuallyDrop;

let task = ManuallyDrop::new(task);

self.scheduler.with(|ptr| {
// Safety: Can only be called after initial `poll`, which is the
// only time the field is mutated.
match unsafe { &*ptr } {
Some(scheduler) => scheduler.release(&*task),
// Task was never polled
None => None,
}
})
}
}

impl<T: Future> CoreStage<T> {
pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R {
self.stage.with_mut(f)
}

/// Poll the future
///
/// # Safety
Expand All @@ -153,7 +220,7 @@ impl<T: Future, S: Schedule> Core<T, S> {
///
/// `self` must also be pinned. This is handled by storing the task on the
/// heap.
pub(super) fn poll(&self, header: &Header) -> Poll<T::Output> {
pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
let res = {
self.stage.with_mut(|ptr| {
// Safety: The caller ensures mutual exclusion to the field.
Expand All @@ -165,11 +232,6 @@ impl<T: Future, S: Schedule> Core<T, S> {
// Safety: The caller ensures the future is pinned.
let future = unsafe { Pin::new_unchecked(future) };

// The waker passed into the `poll` function does not require a ref
// count increment.
let waker_ref = waker_ref::<T, S>(header);
let mut cx = Context::from_waker(&*waker_ref);

future.poll(&mut cx)
})
};
Expand Down Expand Up @@ -221,52 +283,6 @@ impl<T: Future, S: Schedule> Core<T, S> {
}
})
}

/// Schedule the future for execution
pub(super) fn schedule(&self, task: Notified<S>) {
self.scheduler.with(|ptr| {
// Safety: Can only be called after initial `poll`, which is the
// only time the field is mutated.
match unsafe { &*ptr } {
Some(scheduler) => scheduler.schedule(task),
None => panic!("no scheduler set"),
}
});
}

/// Schedule the future for execution in the near future, yielding the
/// thread to other tasks.
pub(super) fn yield_now(&self, task: Notified<S>) {
self.scheduler.with(|ptr| {
// Safety: Can only be called after initial `poll`, which is the
// only time the field is mutated.
match unsafe { &*ptr } {
Some(scheduler) => scheduler.yield_now(task),
None => panic!("no scheduler set"),
}
});
}

/// Release the task
///
/// If the `Scheduler` implementation is able to, it returns the `Task`
/// handle immediately. The caller of this function will batch a ref-dec
/// with a state change.
pub(super) fn release(&self, task: Task<S>) -> Option<Task<S>> {
use std::mem::ManuallyDrop;

let task = ManuallyDrop::new(task);

self.scheduler.with(|ptr| {
// Safety: Can only be called after initial `poll`, which is the
// only time the field is mutated.
match unsafe { &*ptr } {
Some(scheduler) => scheduler.release(&*task),
// Task was never polled
None => None,
}
})
}
}

cfg_rt_multi_thread! {
Expand All @@ -280,6 +296,26 @@ cfg_rt_multi_thread! {
}
}

impl Trailer {
pub(crate) unsafe fn set_waker(&self, waker: Option<Waker>) {
self.waker.with_mut(|ptr| {
*ptr = waker;
});
}

pub(crate) unsafe fn will_wake(&self, waker: &Waker) -> bool {
self.waker
.with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker))
}

pub(crate) fn wake_join(&self) {
self.waker.with(|ptr| match unsafe { &*ptr } {
Some(waker) => waker.wake_by_ref(),
None => panic!("waker missing"),
});
}
}

#[test]
#[cfg(not(loom))]
fn header_lte_cache_line() {
Expand Down

0 comments on commit 204603b

Please sign in to comment.