Skip to content

Commit

Permalink
rt: move CoreStage methods to Core (#5182)
Browse files Browse the repository at this point in the history
  • Loading branch information
agayev committed Nov 10, 2022
1 parent 53cba02 commit 9e3fb16
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 36 deletions.
8 changes: 5 additions & 3 deletions tokio/src/runtime/task/core.rs
Expand Up @@ -155,7 +155,9 @@ impl<T: Future> CoreStage<T> {
pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R {
self.stage.with_mut(f)
}
}

impl<T: Future, S: Schedule> Core<T, S> {
/// Polls the future.
///
/// # Safety
Expand All @@ -171,7 +173,7 @@ impl<T: Future> CoreStage<T> {
/// heap.
pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
let res = {
self.stage.with_mut(|ptr| {
self.stage.stage.with_mut(|ptr| {
// Safety: The caller ensures mutual exclusion to the field.
let future = match unsafe { &mut *ptr } {
Stage::Running(future) => future,
Expand Down Expand Up @@ -224,7 +226,7 @@ impl<T: Future> CoreStage<T> {
pub(super) fn take_output(&self) -> super::Result<T::Output> {
use std::mem;

self.stage.with_mut(|ptr| {
self.stage.stage.with_mut(|ptr| {
// Safety:: the caller ensures mutual exclusion to the field.
match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) {
Stage::Finished(output) => output,
Expand All @@ -234,7 +236,7 @@ impl<T: Future> CoreStage<T> {
}

unsafe fn set_stage(&self, stage: Stage<T>) {
self.stage.with_mut(|ptr| *ptr = stage)
self.stage.stage.with_mut(|ptr| *ptr = stage)
}
}

Expand Down
53 changes: 20 additions & 33 deletions tokio/src/runtime/task/harness.rs
@@ -1,5 +1,5 @@
use crate::future::Future;
use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Trailer};
use crate::runtime::task::core::{Cell, Core, Header, Trailer};
use crate::runtime::task::state::{Snapshot, State};
use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{JoinError, Notified, Schedule, Task};
Expand Down Expand Up @@ -104,13 +104,7 @@ where
let header_ptr = self.header_ptr();
let waker_ref = waker_ref::<T, S>(&header_ptr);
let cx = Context::from_waker(&*waker_ref);
let core = self.core();
let res = poll_future(
&core.stage,
&self.core().scheduler,
core.task_id.clone(),
cx,
);
let res = poll_future(self.core(), cx);

if res == Poll::Ready(()) {
// The future completed. Move on to complete the task.
Expand All @@ -124,15 +118,13 @@ where
TransitionToIdle::Cancelled => {
// The transition to idle failed because the task was
// cancelled during the poll.
let core = self.core();
cancel_task(&core.stage, core.task_id.clone());
cancel_task(self.core());
PollFuture::Complete
}
}
}
TransitionToRunning::Cancelled => {
let core = self.core();
cancel_task(&core.stage, core.task_id.clone());
cancel_task(self.core());
PollFuture::Complete
}
TransitionToRunning::Failed => PollFuture::Done,
Expand All @@ -155,8 +147,7 @@ where

// By transitioning the lifecycle to `Running`, we have permission to
// drop the future.
let core = self.core();
cancel_task(&core.stage, core.task_id.clone());
cancel_task(self.core());
self.complete();
}

Expand Down Expand Up @@ -190,7 +181,7 @@ where
/// Read the task output into `dst`.
pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
if can_read_output(self.header(), self.trailer(), waker) {
*dst = Poll::Ready(self.core().stage.take_output());
*dst = Poll::Ready(self.core().take_output());
}
}

Expand All @@ -215,7 +206,7 @@ where
// they are dropping the `JoinHandle`, we assume they are not
// interested in the panic and swallow it.
let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
self.core().stage.drop_future_or_output();
self.core().drop_future_or_output();
}));
}

Expand Down Expand Up @@ -325,7 +316,7 @@ where
// The `JoinHandle` is not interested in the output of
// this task. It is our responsibility to drop the
// output.
self.core().stage.drop_future_or_output();
self.core().drop_future_or_output();
} else if snapshot.has_join_waker() {
// Notify the join handle. The previous transition obtains the
// lock on the waker cell.
Expand Down Expand Up @@ -457,36 +448,32 @@ enum PollFuture {
}

/// Cancels the task and store the appropriate error in the stage field.
fn cancel_task<T: Future>(stage: &CoreStage<T>, id: super::Id) {
fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) {
// Drop the future from a panic guard.
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
stage.drop_future_or_output();
core.drop_future_or_output();
}));

let id = core.task_id.clone();
match res {
Ok(()) => {
stage.store_output(Err(JoinError::cancelled(id)));
core.store_output(Err(JoinError::cancelled(id)));
}
Err(panic) => {
stage.store_output(Err(JoinError::panic(id, panic)));
core.store_output(Err(JoinError::panic(id, panic)));
}
}
}

/// Polls the future. If the future completes, the output is written to the
/// stage field.
fn poll_future<T: Future, S: Schedule>(
core: &CoreStage<T>,
scheduler: &S,
id: super::Id,
cx: Context<'_>,
) -> Poll<()> {
fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> {
// Poll the future.
let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
struct Guard<'a, T: Future> {
core: &'a CoreStage<T>,
struct Guard<'a, T: Future, S: Schedule> {
core: &'a Core<T, S>,
}
impl<'a, T: Future> Drop for Guard<'a, T> {
impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> {
fn drop(&mut self) {
// If the future panics on poll, we drop it inside the panic
// guard.
Expand All @@ -504,8 +491,8 @@ fn poll_future<T: Future, S: Schedule>(
Ok(Poll::Pending) => return Poll::Pending,
Ok(Poll::Ready(output)) => Ok(output),
Err(panic) => {
scheduler.unhandled_panic();
Err(JoinError::panic(id, panic))
core.scheduler.unhandled_panic();
Err(JoinError::panic(core.task_id.clone(), panic))
}
};

Expand All @@ -515,7 +502,7 @@ fn poll_future<T: Future, S: Schedule>(
}));

if res.is_err() {
scheduler.unhandled_panic();
core.scheduler.unhandled_panic();
}

Poll::Ready(())
Expand Down

0 comments on commit 9e3fb16

Please sign in to comment.