From a201b91a245f2558ed58f0b516af79df63e4fc77 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 21 Apr 2022 10:32:20 -0700 Subject: [PATCH] assign IDs sequentially instead of by addr Signed-off-by: Eliza Weisman --- tokio/src/runtime/task/abort.rs | 14 ++++++++------ tokio/src/runtime/task/core.rs | 8 ++++++-- tokio/src/runtime/task/harness.rs | 14 ++++++++------ tokio/src/runtime/task/join.rs | 15 +++++++++------ tokio/src/runtime/task/mod.rs | 31 +++++++++---------------------- tokio/src/runtime/task/raw.rs | 10 +++------- 6 files changed, 43 insertions(+), 49 deletions(-) diff --git a/tokio/src/runtime/task/abort.rs b/tokio/src/runtime/task/abort.rs index ddea6803ab9..cad639ca0c8 100644 --- a/tokio/src/runtime/task/abort.rs +++ b/tokio/src/runtime/task/abort.rs @@ -1,4 +1,4 @@ -use crate::runtime::task::RawTask; +use crate::runtime::task::{Id, RawTask}; use std::fmt; use std::panic::{RefUnwindSafe, UnwindSafe}; @@ -21,11 +21,12 @@ use std::panic::{RefUnwindSafe, UnwindSafe}; #[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] pub struct AbortHandle { raw: Option, + id: Id, } impl AbortHandle { - pub(super) fn new(raw: Option) -> Self { - Self { raw } + pub(super) fn new(raw: Option, id: Id) -> Self { + Self { raw, id } } /// Abort the task associated with the handle. @@ -60,8 +61,7 @@ impl AbortHandle { #[cfg(tokio_unstable)] #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] pub fn id(&self) -> super::Id { - // XXX(eliza): should this return an option instead? probably not... - self.raw.unwrap().id() + self.id.clone() } } @@ -73,7 +73,9 @@ impl RefUnwindSafe for AbortHandle {} impl fmt::Debug for AbortHandle { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("AbortHandle").finish() + fmt.debug_struct("AbortHandle") + .field("id", &self.id) + .finish() } } diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index 776e8341f37..548c56da3d4 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -13,7 +13,7 @@ use crate::future::Future; use crate::loom::cell::UnsafeCell; use crate::runtime::task::raw::{self, Vtable}; use crate::runtime::task::state::State; -use crate::runtime::task::Schedule; +use crate::runtime::task::{Id, Schedule}; use crate::util::linked_list; use std::pin::Pin; @@ -49,6 +49,9 @@ pub(super) struct Core { /// Either the future or the output. pub(super) stage: CoreStage, + + /// The task's ID, used for populating `JoinError`s. + pub(super) task_id: Id, } /// Crate public as this is also needed by the pool. @@ -102,7 +105,7 @@ pub(super) enum Stage { impl Cell { /// Allocates a new task cell, containing the header, trailer, and core /// structures. - pub(super) fn new(future: T, scheduler: S, state: State) -> Box> { + pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box> { #[cfg(all(tokio_unstable, feature = "tracing"))] let id = future.id(); Box::new(Cell { @@ -120,6 +123,7 @@ impl Cell { stage: CoreStage { stage: UnsafeCell::new(Stage::Running(future)), }, + task_id, }, trailer: Trailer { waker: UnsafeCell::new(None), diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index d88e01f0cc9..1d3ababfb17 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -100,8 +100,8 @@ where let header_ptr = self.header_ptr(); let waker_ref = waker_ref::(&header_ptr); let cx = Context::from_waker(&*waker_ref); - let id = super::Id::from_raw(header_ptr); - let res = poll_future(&self.core().stage, id.clone(), cx); + let core = self.core(); + let res = poll_future(&core.stage, core.task_id.clone(), cx); if res == Poll::Ready(()) { // The future completed. Move on to complete the task. @@ -115,14 +115,15 @@ where TransitionToIdle::Cancelled => { // The transition to idle failed because the task was // cancelled during the poll. - - cancel_task(&self.core().stage, id); + let core = self.core(); + cancel_task(&core.stage, core.task_id.clone()); PollFuture::Complete } } } TransitionToRunning::Cancelled => { - cancel_task(&self.core().stage, super::Id::from_raw(self.header_ptr())); + let core = self.core(); + cancel_task(&core.stage, core.task_id.clone()); PollFuture::Complete } TransitionToRunning::Failed => PollFuture::Done, @@ -145,7 +146,8 @@ where // By transitioning the lifecycle to `Running`, we have permission to // drop the future. - cancel_task(&self.core().stage, super::Id::from_raw(self.header_ptr())); + let core = self.core(); + cancel_task(&core.stage, core.task_id.clone()); self.complete(); } diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index 7f4dd75b7a0..b35a2c859a8 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -1,4 +1,4 @@ -use crate::runtime::task::RawTask; +use crate::runtime::task::{Id, RawTask}; use std::fmt; use std::future::Future; @@ -144,6 +144,7 @@ cfg_rt! { /// [`JoinError`]: crate::task::JoinError pub struct JoinHandle { raw: Option, + id: Id, _p: PhantomData, } } @@ -155,9 +156,10 @@ impl UnwindSafe for JoinHandle {} impl RefUnwindSafe for JoinHandle {} impl JoinHandle { - pub(super) fn new(raw: RawTask) -> JoinHandle { + pub(super) fn new(raw: RawTask, id: Id) -> JoinHandle { JoinHandle { raw: Some(raw), + id, _p: PhantomData, } } @@ -218,7 +220,7 @@ impl JoinHandle { raw.ref_inc(); raw }); - super::AbortHandle::new(raw) + super::AbortHandle::new(raw, self.id.clone()) } /// Returns a [task ID] that uniquely identifies this task relative to other @@ -235,8 +237,7 @@ impl JoinHandle { #[cfg(tokio_unstable)] #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] pub fn id(&self) -> super::Id { - // XXX(eliza): should this return an option instead? probably not... - self.raw.unwrap().id() + self.id.clone() } } @@ -298,6 +299,8 @@ where T: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("JoinHandle").finish() + fmt.debug_struct("JoinHandle") + .field("id", &self.id) + .finish() } } diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index bc28f399f63..879edc5c10d 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -203,7 +203,7 @@ use std::{fmt, mem}; #[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] // TODO(eliza): there's almost certainly no reason not to make this `Copy` as well... #[derive(Clone, Debug, Hash, Eq, PartialEq)] -pub struct Id(std::num::NonZeroUsize); +pub struct Id(usize); /// An owned handle to the task, tracked by ref count. #[repr(transparent)] @@ -278,7 +278,8 @@ cfg_rt! { T: Future + 'static, T::Output: 'static, { - let raw = RawTask::new::(task, scheduler); + let id = Id::next(); + let raw = RawTask::new::(task, scheduler, id.clone()); let task = Task { raw, _p: PhantomData, @@ -287,7 +288,7 @@ cfg_rt! { raw, _p: PhantomData, }); - let join = JoinHandle::new(raw); + let join = JoinHandle::new(raw, id); (task, notified, join) } @@ -477,25 +478,11 @@ impl fmt::Display for Id { self.0.fmt(f) } } -impl Id { - #[inline] - fn from_raw(ptr: NonNull
) -> Self { - use std::num::NonZeroUsize; - let addr = ptr.as_ptr() as usize; - - #[cfg(debug_assertions)] - let inner = NonZeroUsize::new(addr) - .expect("a `NonNull` pointer will never be 0 when cast to `usize`"); - - #[cfg(not(debug_assertions))] - let inner = unsafe { - // Safety: `addr` was cast from a `NonNull` pointer, which must - // never be null (0). Since the pointer is not null, the integer - // will never be zero, so this is safe as long as the `NonNull` was - // constructed safely. - NonZeroUsize::new_unchecked(addr) - }; - Self(inner) +impl Id { + fn next() -> Self { + use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; + static NEXT_ID: AtomicUsize = AtomicUsize::new(1); + Self(NEXT_ID.fetch_add(1, Relaxed)) } } diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index 5beb60a1f7b..5555298a4d4 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -1,5 +1,5 @@ use crate::future::Future; -use crate::runtime::task::{Cell, Harness, Header, Schedule, State}; +use crate::runtime::task::{Cell, Harness, Header, Id, Schedule, State}; use std::ptr::NonNull; use std::task::{Poll, Waker}; @@ -52,12 +52,12 @@ pub(super) fn vtable() -> &'static Vtable { } impl RawTask { - pub(super) fn new(task: T, scheduler: S) -> RawTask + pub(super) fn new(task: T, scheduler: S, id: Id) -> RawTask where T: Future, S: Schedule, { - let ptr = Box::into_raw(Cell::<_, S>::new(task, scheduler, State::new())); + let ptr = Box::into_raw(Cell::<_, S>::new(task, scheduler, State::new(), id)); let ptr = unsafe { NonNull::new_unchecked(ptr as *mut Header) }; RawTask { ptr } @@ -129,10 +129,6 @@ impl RawTask { pub(super) fn ref_inc(self) { self.header().state.ref_inc(); } - - pub(super) fn id(&self) -> super::Id { - super::Id::from_raw(self.ptr) - } } impl Clone for RawTask {