diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index 2b7050a2111..3e07d7c97fd 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -155,7 +155,9 @@ impl CoreStage { pub(super) fn with_mut(&self, f: impl FnOnce(*mut Stage) -> R) -> R { self.stage.with_mut(f) } +} +impl Core { /// Polls the future. /// /// # Safety @@ -171,7 +173,7 @@ impl CoreStage { /// heap. pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll { let res = { - self.stage.with_mut(|ptr| { + self.stage.stage.with_mut(|ptr| { // Safety: The caller ensures mutual exclusion to the field. let future = match unsafe { &mut *ptr } { Stage::Running(future) => future, @@ -224,7 +226,7 @@ impl CoreStage { pub(super) fn take_output(&self) -> super::Result { use std::mem; - self.stage.with_mut(|ptr| { + self.stage.stage.with_mut(|ptr| { // Safety:: the caller ensures mutual exclusion to the field. match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) { Stage::Finished(output) => output, @@ -234,7 +236,7 @@ impl CoreStage { } unsafe fn set_stage(&self, stage: Stage) { - self.stage.with_mut(|ptr| *ptr = stage) + self.stage.stage.with_mut(|ptr| *ptr = stage) } } diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 70415799225..545b01b7ff7 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -1,5 +1,5 @@ use crate::future::Future; -use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Trailer}; +use crate::runtime::task::core::{Cell, Core, Header, Trailer}; use crate::runtime::task::state::{Snapshot, State}; use crate::runtime::task::waker::waker_ref; use crate::runtime::task::{JoinError, Notified, Schedule, Task}; @@ -104,13 +104,7 @@ where let header_ptr = self.header_ptr(); let waker_ref = waker_ref::(&header_ptr); let cx = Context::from_waker(&*waker_ref); - let core = self.core(); - let res = poll_future( - &core.stage, - &self.core().scheduler, - core.task_id.clone(), - cx, - ); + let res = poll_future(self.core(), cx); if res == Poll::Ready(()) { // The future completed. Move on to complete the task. @@ -124,15 +118,13 @@ where TransitionToIdle::Cancelled => { // The transition to idle failed because the task was // cancelled during the poll. - let core = self.core(); - cancel_task(&core.stage, core.task_id.clone()); + cancel_task(self.core()); PollFuture::Complete } } } TransitionToRunning::Cancelled => { - let core = self.core(); - cancel_task(&core.stage, core.task_id.clone()); + cancel_task(self.core()); PollFuture::Complete } TransitionToRunning::Failed => PollFuture::Done, @@ -155,8 +147,7 @@ where // By transitioning the lifecycle to `Running`, we have permission to // drop the future. - let core = self.core(); - cancel_task(&core.stage, core.task_id.clone()); + cancel_task(self.core()); self.complete(); } @@ -190,7 +181,7 @@ where /// Read the task output into `dst`. pub(super) fn try_read_output(self, dst: &mut Poll>, waker: &Waker) { if can_read_output(self.header(), self.trailer(), waker) { - *dst = Poll::Ready(self.core().stage.take_output()); + *dst = Poll::Ready(self.core().take_output()); } } @@ -215,7 +206,7 @@ where // they are dropping the `JoinHandle`, we assume they are not // interested in the panic and swallow it. let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { - self.core().stage.drop_future_or_output(); + self.core().drop_future_or_output(); })); } @@ -325,7 +316,7 @@ where // The `JoinHandle` is not interested in the output of // this task. It is our responsibility to drop the // output. - self.core().stage.drop_future_or_output(); + self.core().drop_future_or_output(); } else if snapshot.has_join_waker() { // Notify the join handle. The previous transition obtains the // lock on the waker cell. @@ -457,36 +448,32 @@ enum PollFuture { } /// Cancels the task and store the appropriate error in the stage field. -fn cancel_task(stage: &CoreStage, id: super::Id) { +fn cancel_task(core: &Core) { // Drop the future from a panic guard. let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { - stage.drop_future_or_output(); + core.drop_future_or_output(); })); + let id = core.task_id.clone(); match res { Ok(()) => { - stage.store_output(Err(JoinError::cancelled(id))); + core.store_output(Err(JoinError::cancelled(id))); } Err(panic) => { - stage.store_output(Err(JoinError::panic(id, panic))); + core.store_output(Err(JoinError::panic(id, panic))); } } } /// Polls the future. If the future completes, the output is written to the /// stage field. -fn poll_future( - core: &CoreStage, - scheduler: &S, - id: super::Id, - cx: Context<'_>, -) -> Poll<()> { +fn poll_future(core: &Core, cx: Context<'_>) -> Poll<()> { // Poll the future. let output = panic::catch_unwind(panic::AssertUnwindSafe(|| { - struct Guard<'a, T: Future> { - core: &'a CoreStage, + struct Guard<'a, T: Future, S: Schedule> { + core: &'a Core, } - impl<'a, T: Future> Drop for Guard<'a, T> { + impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> { fn drop(&mut self) { // If the future panics on poll, we drop it inside the panic // guard. @@ -504,8 +491,8 @@ fn poll_future( Ok(Poll::Pending) => return Poll::Pending, Ok(Poll::Ready(output)) => Ok(output), Err(panic) => { - scheduler.unhandled_panic(); - Err(JoinError::panic(id, panic)) + core.scheduler.unhandled_panic(); + Err(JoinError::panic(core.task_id.clone(), panic)) } }; @@ -515,7 +502,7 @@ fn poll_future( })); if res.is_err() { - scheduler.unhandled_panic(); + core.scheduler.unhandled_panic(); } Poll::Ready(())