Skip to content

Commit

Permalink
assign IDs sequentially instead of by addr
Browse files Browse the repository at this point in the history
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Apr 21, 2022
1 parent a332b7a commit a201b91
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 49 deletions.
14 changes: 8 additions & 6 deletions tokio/src/runtime/task/abort.rs
@@ -1,4 +1,4 @@
use crate::runtime::task::RawTask;
use crate::runtime::task::{Id, RawTask};
use std::fmt;
use std::panic::{RefUnwindSafe, UnwindSafe};

Expand All @@ -21,11 +21,12 @@ use std::panic::{RefUnwindSafe, UnwindSafe};
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
pub struct AbortHandle {
raw: Option<RawTask>,
id: Id,
}

impl AbortHandle {
pub(super) fn new(raw: Option<RawTask>) -> Self {
Self { raw }
pub(super) fn new(raw: Option<RawTask>, id: Id) -> Self {
Self { raw, id }
}

/// Abort the task associated with the handle.
Expand Down Expand Up @@ -60,8 +61,7 @@ impl AbortHandle {
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn id(&self) -> super::Id {
// XXX(eliza): should this return an option instead? probably not...
self.raw.unwrap().id()
self.id.clone()
}
}

Expand All @@ -73,7 +73,9 @@ impl RefUnwindSafe for AbortHandle {}

impl fmt::Debug for AbortHandle {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("AbortHandle").finish()
fmt.debug_struct("AbortHandle")
.field("id", &self.id)
.finish()
}
}

Expand Down
8 changes: 6 additions & 2 deletions tokio/src/runtime/task/core.rs
Expand Up @@ -13,7 +13,7 @@ use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
use crate::runtime::task::Schedule;
use crate::runtime::task::{Id, Schedule};
use crate::util::linked_list;

use std::pin::Pin;
Expand Down Expand Up @@ -49,6 +49,9 @@ pub(super) struct Core<T: Future, S> {

/// Either the future or the output.
pub(super) stage: CoreStage<T>,

/// The task's ID, used for populating `JoinError`s.
pub(super) task_id: Id,
}

/// Crate public as this is also needed by the pool.
Expand Down Expand Up @@ -102,7 +105,7 @@ pub(super) enum Stage<T: Future> {
impl<T: Future, S: Schedule> Cell<T, S> {
/// Allocates a new task cell, containing the header, trailer, and core
/// structures.
pub(super) fn new(future: T, scheduler: S, state: State) -> Box<Cell<T, S>> {
pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let id = future.id();
Box::new(Cell {
Expand All @@ -120,6 +123,7 @@ impl<T: Future, S: Schedule> Cell<T, S> {
stage: CoreStage {
stage: UnsafeCell::new(Stage::Running(future)),
},
task_id,
},
trailer: Trailer {
waker: UnsafeCell::new(None),
Expand Down
14 changes: 8 additions & 6 deletions tokio/src/runtime/task/harness.rs
Expand Up @@ -100,8 +100,8 @@ where
let header_ptr = self.header_ptr();
let waker_ref = waker_ref::<T, S>(&header_ptr);
let cx = Context::from_waker(&*waker_ref);
let id = super::Id::from_raw(header_ptr);
let res = poll_future(&self.core().stage, id.clone(), cx);
let core = self.core();
let res = poll_future(&core.stage, core.task_id.clone(), cx);

if res == Poll::Ready(()) {
// The future completed. Move on to complete the task.
Expand All @@ -115,14 +115,15 @@ where
TransitionToIdle::Cancelled => {
// The transition to idle failed because the task was
// cancelled during the poll.

cancel_task(&self.core().stage, id);
let core = self.core();
cancel_task(&core.stage, core.task_id.clone());
PollFuture::Complete
}
}
}
TransitionToRunning::Cancelled => {
cancel_task(&self.core().stage, super::Id::from_raw(self.header_ptr()));
let core = self.core();
cancel_task(&core.stage, core.task_id.clone());
PollFuture::Complete
}
TransitionToRunning::Failed => PollFuture::Done,
Expand All @@ -145,7 +146,8 @@ where

// By transitioning the lifecycle to `Running`, we have permission to
// drop the future.
cancel_task(&self.core().stage, super::Id::from_raw(self.header_ptr()));
let core = self.core();
cancel_task(&core.stage, core.task_id.clone());
self.complete();
}

Expand Down
15 changes: 9 additions & 6 deletions tokio/src/runtime/task/join.rs
@@ -1,4 +1,4 @@
use crate::runtime::task::RawTask;
use crate::runtime::task::{Id, RawTask};

use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -144,6 +144,7 @@ cfg_rt! {
/// [`JoinError`]: crate::task::JoinError
pub struct JoinHandle<T> {
raw: Option<RawTask>,
id: Id,
_p: PhantomData<T>,
}
}
Expand All @@ -155,9 +156,10 @@ impl<T> UnwindSafe for JoinHandle<T> {}
impl<T> RefUnwindSafe for JoinHandle<T> {}

impl<T> JoinHandle<T> {
pub(super) fn new(raw: RawTask) -> JoinHandle<T> {
pub(super) fn new(raw: RawTask, id: Id) -> JoinHandle<T> {
JoinHandle {
raw: Some(raw),
id,
_p: PhantomData,
}
}
Expand Down Expand Up @@ -218,7 +220,7 @@ impl<T> JoinHandle<T> {
raw.ref_inc();
raw
});
super::AbortHandle::new(raw)
super::AbortHandle::new(raw, self.id.clone())
}

/// Returns a [task ID] that uniquely identifies this task relative to other
Expand All @@ -235,8 +237,7 @@ impl<T> JoinHandle<T> {
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn id(&self) -> super::Id {
// XXX(eliza): should this return an option instead? probably not...
self.raw.unwrap().id()
self.id.clone()
}
}

Expand Down Expand Up @@ -298,6 +299,8 @@ where
T: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("JoinHandle").finish()
fmt.debug_struct("JoinHandle")
.field("id", &self.id)
.finish()
}
}
31 changes: 9 additions & 22 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -203,7 +203,7 @@ use std::{fmt, mem};
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
// TODO(eliza): there's almost certainly no reason not to make this `Copy` as well...
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct Id(std::num::NonZeroUsize);
pub struct Id(usize);

/// An owned handle to the task, tracked by ref count.
#[repr(transparent)]
Expand Down Expand Up @@ -278,7 +278,8 @@ cfg_rt! {
T: Future + 'static,
T::Output: 'static,
{
let raw = RawTask::new::<T, S>(task, scheduler);
let id = Id::next();
let raw = RawTask::new::<T, S>(task, scheduler, id.clone());
let task = Task {
raw,
_p: PhantomData,
Expand All @@ -287,7 +288,7 @@ cfg_rt! {
raw,
_p: PhantomData,
});
let join = JoinHandle::new(raw);
let join = JoinHandle::new(raw, id);

(task, notified, join)
}
Expand Down Expand Up @@ -477,25 +478,11 @@ impl fmt::Display for Id {
self.0.fmt(f)
}
}
impl Id {
#[inline]
fn from_raw(ptr: NonNull<Header>) -> Self {
use std::num::NonZeroUsize;
let addr = ptr.as_ptr() as usize;

#[cfg(debug_assertions)]
let inner = NonZeroUsize::new(addr)
.expect("a `NonNull` pointer will never be 0 when cast to `usize`");

#[cfg(not(debug_assertions))]
let inner = unsafe {
// Safety: `addr` was cast from a `NonNull` pointer, which must
// never be null (0). Since the pointer is not null, the integer
// will never be zero, so this is safe as long as the `NonNull` was
// constructed safely.
NonZeroUsize::new_unchecked(addr)
};

Self(inner)
impl Id {
fn next() -> Self {
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
Self(NEXT_ID.fetch_add(1, Relaxed))
}
}
10 changes: 3 additions & 7 deletions tokio/src/runtime/task/raw.rs
@@ -1,5 +1,5 @@
use crate::future::Future;
use crate::runtime::task::{Cell, Harness, Header, Schedule, State};
use crate::runtime::task::{Cell, Harness, Header, Id, Schedule, State};

use std::ptr::NonNull;
use std::task::{Poll, Waker};
Expand Down Expand Up @@ -52,12 +52,12 @@ pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
}

impl RawTask {
pub(super) fn new<T, S>(task: T, scheduler: S) -> RawTask
pub(super) fn new<T, S>(task: T, scheduler: S, id: Id) -> RawTask
where
T: Future,
S: Schedule,
{
let ptr = Box::into_raw(Cell::<_, S>::new(task, scheduler, State::new()));
let ptr = Box::into_raw(Cell::<_, S>::new(task, scheduler, State::new(), id));
let ptr = unsafe { NonNull::new_unchecked(ptr as *mut Header) };

RawTask { ptr }
Expand Down Expand Up @@ -129,10 +129,6 @@ impl RawTask {
pub(super) fn ref_inc(self) {
self.header().state.ref_inc();
}

pub(super) fn id(&self) -> super::Id {
super::Id::from_raw(self.ptr)
}
}

impl Clone for RawTask {
Expand Down

0 comments on commit a201b91

Please sign in to comment.