From d58c412cf9e9dbe03dd896f5441bd4abba5ae20b Mon Sep 17 00:00:00 2001 From: Abutalib Aghayev Date: Thu, 10 Nov 2022 07:09:13 -0500 Subject: [PATCH 1/4] rt: move CoreStage functions to Core --- tokio/src/runtime/task/core.rs | 8 ++++-- tokio/src/runtime/task/harness.rs | 48 ++++++++++++------------------- tokio/src/runtime/task/mod.rs | 3 +- 3 files changed, 25 insertions(+), 34 deletions(-) diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index 2b7050a2111..3e07d7c97fd 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -155,7 +155,9 @@ impl CoreStage { pub(super) fn with_mut(&self, f: impl FnOnce(*mut Stage) -> R) -> R { self.stage.with_mut(f) } +} +impl Core { /// Polls the future. /// /// # Safety @@ -171,7 +173,7 @@ impl CoreStage { /// heap. pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll { let res = { - self.stage.with_mut(|ptr| { + self.stage.stage.with_mut(|ptr| { // Safety: The caller ensures mutual exclusion to the field. let future = match unsafe { &mut *ptr } { Stage::Running(future) => future, @@ -224,7 +226,7 @@ impl CoreStage { pub(super) fn take_output(&self) -> super::Result { use std::mem; - self.stage.with_mut(|ptr| { + self.stage.stage.with_mut(|ptr| { // Safety:: the caller ensures mutual exclusion to the field. match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) { Stage::Finished(output) => output, @@ -234,7 +236,7 @@ impl CoreStage { } unsafe fn set_stage(&self, stage: Stage) { - self.stage.with_mut(|ptr| *ptr = stage) + self.stage.stage.with_mut(|ptr| *ptr = stage) } } diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 70415799225..92d127d0c24 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -1,5 +1,5 @@ use crate::future::Future; -use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Trailer}; +use crate::runtime::task::core::{Cell, Core, Header, Trailer}; use crate::runtime::task::state::{Snapshot, State}; use crate::runtime::task::waker::waker_ref; use crate::runtime::task::{JoinError, Notified, Schedule, Task}; @@ -105,12 +105,7 @@ where let waker_ref = waker_ref::(&header_ptr); let cx = Context::from_waker(&*waker_ref); let core = self.core(); - let res = poll_future( - &core.stage, - &self.core().scheduler, - core.task_id.clone(), - cx, - ); + let res = poll_future(&core, cx); if res == Poll::Ready(()) { // The future completed. Move on to complete the task. @@ -125,14 +120,14 @@ where // The transition to idle failed because the task was // cancelled during the poll. let core = self.core(); - cancel_task(&core.stage, core.task_id.clone()); + cancel_task(&core); PollFuture::Complete } } } TransitionToRunning::Cancelled => { let core = self.core(); - cancel_task(&core.stage, core.task_id.clone()); + cancel_task(&core); PollFuture::Complete } TransitionToRunning::Failed => PollFuture::Done, @@ -156,7 +151,7 @@ where // By transitioning the lifecycle to `Running`, we have permission to // drop the future. let core = self.core(); - cancel_task(&core.stage, core.task_id.clone()); + cancel_task(&core); self.complete(); } @@ -190,7 +185,7 @@ where /// Read the task output into `dst`. pub(super) fn try_read_output(self, dst: &mut Poll>, waker: &Waker) { if can_read_output(self.header(), self.trailer(), waker) { - *dst = Poll::Ready(self.core().stage.take_output()); + *dst = Poll::Ready(self.core().take_output()); } } @@ -215,7 +210,7 @@ where // they are dropping the `JoinHandle`, we assume they are not // interested in the panic and swallow it. let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { - self.core().stage.drop_future_or_output(); + self.core().drop_future_or_output(); })); } @@ -325,7 +320,7 @@ where // The `JoinHandle` is not interested in the output of // this task. It is our responsibility to drop the // output. - self.core().stage.drop_future_or_output(); + self.core().drop_future_or_output(); } else if snapshot.has_join_waker() { // Notify the join handle. The previous transition obtains the // lock on the waker cell. @@ -457,36 +452,31 @@ enum PollFuture { } /// Cancels the task and store the appropriate error in the stage field. -fn cancel_task(stage: &CoreStage, id: super::Id) { +fn cancel_task(core: &Core) { // Drop the future from a panic guard. let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { - stage.drop_future_or_output(); + core.drop_future_or_output(); })); match res { Ok(()) => { - stage.store_output(Err(JoinError::cancelled(id))); + core.store_output(Err(JoinError::cancelled(core.task_id))); } Err(panic) => { - stage.store_output(Err(JoinError::panic(id, panic))); + core.store_output(Err(JoinError::panic(core.task_id, panic))); } } } /// Polls the future. If the future completes, the output is written to the /// stage field. -fn poll_future( - core: &CoreStage, - scheduler: &S, - id: super::Id, - cx: Context<'_>, -) -> Poll<()> { +fn poll_future(core: &Core, cx: Context<'_>) -> Poll<()> { // Poll the future. let output = panic::catch_unwind(panic::AssertUnwindSafe(|| { - struct Guard<'a, T: Future> { - core: &'a CoreStage, + struct Guard<'a, T: Future, S: Schedule> { + core: &'a Core, } - impl<'a, T: Future> Drop for Guard<'a, T> { + impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> { fn drop(&mut self) { // If the future panics on poll, we drop it inside the panic // guard. @@ -504,8 +494,8 @@ fn poll_future( Ok(Poll::Pending) => return Poll::Pending, Ok(Poll::Ready(output)) => Ok(output), Err(panic) => { - scheduler.unhandled_panic(); - Err(JoinError::panic(id, panic)) + core.scheduler.unhandled_panic(); + Err(JoinError::panic(core.task_id, panic)) } }; @@ -515,7 +505,7 @@ fn poll_future( })); if res.is_err() { - scheduler.unhandled_panic(); + core.scheduler.unhandled_panic(); } Poll::Ready(()) diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 3d5b1cbf373..09449014cf4 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -201,8 +201,7 @@ use std::{fmt, mem}; /// [unstable]: crate#unstable-features #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] #[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] -// TODO(eliza): there's almost certainly no reason not to make this `Copy` as well... -#[derive(Clone, Debug, Hash, Eq, PartialEq)] +#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)] pub struct Id(u64); /// An owned handle to the task, tracked by ref count. From b1b92be32a4207cb3d17422cfd348f633395a3f5 Mon Sep 17 00:00:00 2001 From: Abutalib Aghayev Date: Thu, 10 Nov 2022 07:37:21 -0500 Subject: [PATCH 2/4] revert making Id Clone and remove redundant variables --- tokio/src/runtime/task/harness.rs | 19 ++++++++----------- tokio/src/runtime/task/mod.rs | 3 ++- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 92d127d0c24..d97ab14b075 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -104,8 +104,7 @@ where let header_ptr = self.header_ptr(); let waker_ref = waker_ref::(&header_ptr); let cx = Context::from_waker(&*waker_ref); - let core = self.core(); - let res = poll_future(&core, cx); + let res = poll_future(&self.core(), cx); if res == Poll::Ready(()) { // The future completed. Move on to complete the task. @@ -119,15 +118,13 @@ where TransitionToIdle::Cancelled => { // The transition to idle failed because the task was // cancelled during the poll. - let core = self.core(); - cancel_task(&core); + cancel_task(&self.core()); PollFuture::Complete } } } TransitionToRunning::Cancelled => { - let core = self.core(); - cancel_task(&core); + cancel_task(&self.core()); PollFuture::Complete } TransitionToRunning::Failed => PollFuture::Done, @@ -150,8 +147,7 @@ where // By transitioning the lifecycle to `Running`, we have permission to // drop the future. - let core = self.core(); - cancel_task(&core); + cancel_task(&self.core()); self.complete(); } @@ -458,12 +454,13 @@ fn cancel_task(core: &Core) { core.drop_future_or_output(); })); + let id = core.task_id.clone(); match res { Ok(()) => { - core.store_output(Err(JoinError::cancelled(core.task_id))); + core.store_output(Err(JoinError::cancelled(id))); } Err(panic) => { - core.store_output(Err(JoinError::panic(core.task_id, panic))); + core.store_output(Err(JoinError::panic(id, panic))); } } } @@ -495,7 +492,7 @@ fn poll_future(core: &Core, cx: Context<'_>) -> Po Ok(Poll::Ready(output)) => Ok(output), Err(panic) => { core.scheduler.unhandled_panic(); - Err(JoinError::panic(core.task_id, panic)) + Err(JoinError::panic(core.task_id.clone(), panic)) } }; diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 09449014cf4..3d5b1cbf373 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -201,7 +201,8 @@ use std::{fmt, mem}; /// [unstable]: crate#unstable-features #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] #[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] -#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)] +// TODO(eliza): there's almost certainly no reason not to make this `Copy` as well... +#[derive(Clone, Debug, Hash, Eq, PartialEq)] pub struct Id(u64); /// An owned handle to the task, tracked by ref count. From a895cc7b6351f9c37a129f6111aad791137bf597 Mon Sep 17 00:00:00 2001 From: Abutalib Aghayev Date: Thu, 10 Nov 2022 07:42:40 -0500 Subject: [PATCH 3/4] fix clippy warnings --- tokio/src/runtime/task/harness.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index d97ab14b075..c0efdd124cf 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -104,7 +104,7 @@ where let header_ptr = self.header_ptr(); let waker_ref = waker_ref::(&header_ptr); let cx = Context::from_waker(&*waker_ref); - let res = poll_future(&self.core(), cx); + let res = poll_future(self.core(), cx); if res == Poll::Ready(()) { // The future completed. Move on to complete the task. @@ -118,7 +118,7 @@ where TransitionToIdle::Cancelled => { // The transition to idle failed because the task was // cancelled during the poll. - cancel_task(&self.core()); + cancel_task(self.core()); PollFuture::Complete } } @@ -147,7 +147,7 @@ where // By transitioning the lifecycle to `Running`, we have permission to // drop the future. - cancel_task(&self.core()); + cancel_task(self.core()); self.complete(); } From 7033a4e5768738e065de98248029c09faac51d40 Mon Sep 17 00:00:00 2001 From: Abutalib Aghayev Date: Thu, 10 Nov 2022 07:47:29 -0500 Subject: [PATCH 4/4] one more clippy fix --- tokio/src/runtime/task/harness.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index c0efdd124cf..545b01b7ff7 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -124,7 +124,7 @@ where } } TransitionToRunning::Cancelled => { - cancel_task(&self.core()); + cancel_task(self.core()); PollFuture::Complete } TransitionToRunning::Failed => PollFuture::Done,