From 45e37dbfa27162e3ecb1ca2a902a7d21eeecda8b Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 22 Nov 2022 00:38:22 +0100 Subject: [PATCH] runtime: reduce codegen per task (#5213) This PR should hopefully reduce the amount of code generated per future-type spawned on the runtime. The following methods are no longer generic: * `try_set_join_waker` * `remote_abort` * `clone_waker` * `drop_waker` * `wake_by_ref` * `wake_by_val` A new method is added to the vtable called schedule, which is used when a task should be scheduled on the runtime. E.g. wake_by_ref will call it if the state change says that the task needs to be scheduled. However, this method is only generic over the scheduler, and not the future type, so it also isn't generated for every task. Additionally, one of the changes involved in the above makes it possible to remove the id field from JoinHandle and AbortHandle. --- tokio/src/runtime/task/abort.rs | 35 +++--- tokio/src/runtime/task/core.rs | 74 ++++++++++-- tokio/src/runtime/task/harness.rs | 186 +++++++++++++++--------------- tokio/src/runtime/task/join.rs | 67 ++++------- tokio/src/runtime/task/mod.rs | 2 +- tokio/src/runtime/task/raw.rs | 134 +++++++++++++++------ tokio/src/runtime/task/waker.rs | 82 +++++-------- 7 files changed, 323 insertions(+), 257 deletions(-) diff --git a/tokio/src/runtime/task/abort.rs b/tokio/src/runtime/task/abort.rs index bfdf53c5105..6edca100404 100644 --- a/tokio/src/runtime/task/abort.rs +++ b/tokio/src/runtime/task/abort.rs @@ -1,4 +1,4 @@ -use crate::runtime::task::{Id, RawTask}; +use crate::runtime::task::{Header, RawTask}; use std::fmt; use std::panic::{RefUnwindSafe, UnwindSafe}; @@ -14,13 +14,12 @@ use std::panic::{RefUnwindSafe, UnwindSafe}; /// [`JoinHandle`]: crate::task::JoinHandle #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] pub struct AbortHandle { - raw: Option, - id: Id, + raw: RawTask, } impl AbortHandle { - pub(super) fn new(raw: Option, id: Id) -> Self { - Self { raw, id } + pub(super) fn new(raw: RawTask) -> Self { + Self { raw } } /// Abort the task associated with the handle. @@ -35,9 +34,7 @@ impl AbortHandle { /// [cancelled]: method@super::error::JoinError::is_cancelled /// [`JoinHandle::abort`]: method@super::JoinHandle::abort pub fn abort(&self) { - if let Some(ref raw) = self.raw { - raw.remote_abort(); - } + self.raw.remote_abort(); } /// Checks if the task associated with this `AbortHandle` has finished. @@ -47,12 +44,8 @@ impl AbortHandle { /// some time, and this method does not return `true` until it has /// completed. pub fn is_finished(&self) -> bool { - if let Some(raw) = self.raw { - let state = raw.header().state.load(); - state.is_complete() - } else { - true - } + let state = self.raw.state().load(); + state.is_complete() } /// Returns a [task ID] that uniquely identifies this task relative to other @@ -67,7 +60,8 @@ impl AbortHandle { #[cfg(tokio_unstable)] #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] pub fn id(&self) -> super::Id { - self.id + // Safety: The header pointer is valid. + unsafe { Header::get_id(self.raw.header_ptr()) } } } @@ -79,16 +73,15 @@ impl RefUnwindSafe for AbortHandle {} impl fmt::Debug for AbortHandle { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("AbortHandle") - .field("id", &self.id) - .finish() + // Safety: The header pointer is valid. + let id_ptr = unsafe { Header::get_id_ptr(self.raw.header_ptr()) }; + let id = unsafe { id_ptr.as_ref() }; + fmt.debug_struct("AbortHandle").field("id", id).finish() } } impl Drop for AbortHandle { fn drop(&mut self) { - if let Some(raw) = self.raw.take() { - raw.drop_abort_handle(); - } + self.raw.drop_abort_handle(); } } diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index c4a7c6c72e3..bcccc6988c0 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -25,6 +25,9 @@ use std::task::{Context, Poll, Waker}; /// /// It is critical for `Header` to be the first field as the task structure will /// be referenced by both *mut Cell and *mut Header. +/// +/// Any changes to the layout of this struct _must_ also be reflected in the +/// const fns in raw.rs. #[repr(C)] pub(super) struct Cell { /// Hot task state data @@ -44,15 +47,19 @@ pub(super) struct CoreStage { /// The core of the task. /// /// Holds the future or output, depending on the stage of execution. +/// +/// Any changes to the layout of this struct _must_ also be reflected in the +/// const fns in raw.rs. +#[repr(C)] pub(super) struct Core { /// Scheduler used to drive this future. pub(super) scheduler: S, - /// Either the future or the output. - pub(super) stage: CoreStage, - /// The task's ID, used for populating `JoinError`s. pub(super) task_id: Id, + + /// Either the future or the output. + pub(super) stage: CoreStage, } /// Crate public as this is also needed by the pool. @@ -82,7 +89,7 @@ pub(crate) struct Header { /// The tracing ID for this instrumented task. #[cfg(all(tokio_unstable, feature = "tracing"))] - pub(super) id: Option, + pub(super) tracing_id: Option, } unsafe impl Send for Header {} @@ -117,7 +124,7 @@ impl Cell { /// structures. pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box> { #[cfg(all(tokio_unstable, feature = "tracing"))] - let id = future.id(); + let tracing_id = future.id(); let result = Box::new(Cell { header: Header { state, @@ -125,7 +132,7 @@ impl Cell { vtable: raw::vtable::(), owner_id: UnsafeCell::new(0), #[cfg(all(tokio_unstable, feature = "tracing"))] - id, + tracing_id, }, core: Core { scheduler, @@ -144,8 +151,16 @@ impl Cell { { let trailer_addr = (&result.trailer) as *const Trailer as usize; let trailer_ptr = unsafe { Header::get_trailer(NonNull::from(&result.header)) }; - assert_eq!(trailer_addr, trailer_ptr.as_ptr() as usize); + + let scheduler_addr = (&result.core.scheduler) as *const S as usize; + let scheduler_ptr = + unsafe { Header::get_scheduler::(NonNull::from(&result.header)) }; + assert_eq!(scheduler_addr, scheduler_ptr.as_ptr() as usize); + + let id_addr = (&result.core.task_id) as *const Id as usize; + let id_ptr = unsafe { Header::get_id_ptr(NonNull::from(&result.header)) }; + assert_eq!(id_addr, id_ptr.as_ptr() as usize); } result @@ -295,6 +310,51 @@ impl Header { let trailer = me.as_ptr().cast::().add(offset).cast::(); NonNull::new_unchecked(trailer) } + + /// Gets a pointer to the scheduler of the task containing this `Header`. + /// + /// # Safety + /// + /// The provided raw pointer must point at the header of a task. + /// + /// The generic type S must be set to the correct scheduler type for this + /// task. + pub(super) unsafe fn get_scheduler(me: NonNull
) -> NonNull { + let offset = me.as_ref().vtable.scheduler_offset; + let scheduler = me.as_ptr().cast::().add(offset).cast::(); + NonNull::new_unchecked(scheduler) + } + + /// Gets a pointer to the id of the task containing this `Header`. + /// + /// # Safety + /// + /// The provided raw pointer must point at the header of a task. + pub(super) unsafe fn get_id_ptr(me: NonNull
) -> NonNull { + let offset = me.as_ref().vtable.id_offset; + let id = me.as_ptr().cast::().add(offset).cast::(); + NonNull::new_unchecked(id) + } + + /// Gets the id of the task containing this `Header`. + /// + /// # Safety + /// + /// The provided raw pointer must point at the header of a task. + pub(super) unsafe fn get_id(me: NonNull
) -> Id { + let ptr = Header::get_id_ptr(me).as_ptr(); + *ptr + } + + /// Gets the tracing id of the task containing this `Header`. + /// + /// # Safety + /// + /// The provided raw pointer must point at the header of a task. + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) unsafe fn get_tracing_id(me: &NonNull
) -> Option<&tracing::Id> { + me.as_ref().tracing_id.as_ref() + } } impl Trailer { diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 085aebe92ac..c0792979844 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -2,7 +2,7 @@ use crate::future::Future; use crate::runtime::task::core::{Cell, Core, Header, Trailer}; use crate::runtime::task::state::{Snapshot, State}; use crate::runtime::task::waker::waker_ref; -use crate::runtime::task::{JoinError, Notified, Schedule, Task}; +use crate::runtime::task::{JoinError, Notified, RawTask, Schedule, Task}; use std::mem; use std::mem::ManuallyDrop; @@ -47,11 +47,102 @@ where } } +/// Task operations that can be implemented without being generic over the +/// scheduler or task. Only one version of these methods should exist in the +/// final binary. +impl RawTask { + pub(super) fn drop_reference(self) { + if self.state().ref_dec() { + self.dealloc(); + } + } + + /// This call consumes a ref-count and notifies the task. This will create a + /// new Notified and submit it if necessary. + /// + /// The caller does not need to hold a ref-count besides the one that was + /// passed to this call. + pub(super) fn wake_by_val(&self) { + use super::state::TransitionToNotifiedByVal; + + match self.state().transition_to_notified_by_val() { + TransitionToNotifiedByVal::Submit => { + // The caller has given us a ref-count, and the transition has + // created a new ref-count, so we now hold two. We turn the new + // ref-count Notified and pass it to the call to `schedule`. + // + // The old ref-count is retained for now to ensure that the task + // is not dropped during the call to `schedule` if the call + // drops the task it was given. + self.schedule(); + + // Now that we have completed the call to schedule, we can + // release our ref-count. + self.drop_reference(); + } + TransitionToNotifiedByVal::Dealloc => { + self.dealloc(); + } + TransitionToNotifiedByVal::DoNothing => {} + } + } + + /// This call notifies the task. It will not consume any ref-counts, but the + /// caller should hold a ref-count. This will create a new Notified and + /// submit it if necessary. + pub(super) fn wake_by_ref(&self) { + use super::state::TransitionToNotifiedByRef; + + match self.state().transition_to_notified_by_ref() { + TransitionToNotifiedByRef::Submit => { + // The transition above incremented the ref-count for a new task + // and the caller also holds a ref-count. The caller's ref-count + // ensures that the task is not destroyed even if the new task + // is dropped before `schedule` returns. + self.schedule(); + } + TransitionToNotifiedByRef::DoNothing => {} + } + } + + /// Remotely aborts the task. + /// + /// The caller should hold a ref-count, but we do not consume it. + /// + /// This is similar to `shutdown` except that it asks the runtime to perform + /// the shutdown. This is necessary to avoid the shutdown happening in the + /// wrong thread for non-Send tasks. + pub(super) fn remote_abort(&self) { + if self.state().transition_to_notified_and_cancel() { + // The transition has created a new ref-count, which we turn into + // a Notified and pass to the task. + // + // Since the caller holds a ref-count, the task cannot be destroyed + // before the call to `schedule` returns even if the call drops the + // `Notified` internally. + self.schedule(); + } + } + + /// Try to set the waker notified when the task is complete. Returns true if + /// the task has already completed. If this call returns false, then the + /// waker will not be notified. + pub(super) fn try_set_join_waker(&self, waker: &Waker) -> bool { + can_read_output(self.header(), self.trailer(), waker) + } +} + impl Harness where T: Future, S: Schedule, { + pub(super) fn drop_reference(self) { + if self.state().ref_dec() { + self.dealloc(); + } + } + /// Polls the inner future. A ref-count is consumed. /// /// All necessary state checks and transitions are performed. @@ -185,13 +276,6 @@ where } } - /// Try to set the waker notified when the task is complete. Returns true if - /// the task has already completed. If this call returns false, then the - /// waker will not be notified. - pub(super) fn try_set_join_waker(self, waker: &Waker) -> bool { - can_read_output(self.header(), self.trailer(), waker) - } - pub(super) fn drop_join_handle_slow(self) { // Try to unset `JOIN_INTEREST`. This must be done as a first step in // case the task concurrently completed. @@ -214,92 +298,6 @@ where self.drop_reference(); } - /// Remotely aborts the task. - /// - /// The caller should hold a ref-count, but we do not consume it. - /// - /// This is similar to `shutdown` except that it asks the runtime to perform - /// the shutdown. This is necessary to avoid the shutdown happening in the - /// wrong thread for non-Send tasks. - pub(super) fn remote_abort(self) { - if self.state().transition_to_notified_and_cancel() { - // The transition has created a new ref-count, which we turn into - // a Notified and pass to the task. - // - // Since the caller holds a ref-count, the task cannot be destroyed - // before the call to `schedule` returns even if the call drops the - // `Notified` internally. - self.core() - .scheduler - .schedule(Notified(self.get_new_task())); - } - } - - // ===== waker behavior ===== - - /// This call consumes a ref-count and notifies the task. This will create a - /// new Notified and submit it if necessary. - /// - /// The caller does not need to hold a ref-count besides the one that was - /// passed to this call. - pub(super) fn wake_by_val(self) { - use super::state::TransitionToNotifiedByVal; - - match self.state().transition_to_notified_by_val() { - TransitionToNotifiedByVal::Submit => { - // The caller has given us a ref-count, and the transition has - // created a new ref-count, so we now hold two. We turn the new - // ref-count Notified and pass it to the call to `schedule`. - // - // The old ref-count is retained for now to ensure that the task - // is not dropped during the call to `schedule` if the call - // drops the task it was given. - self.core() - .scheduler - .schedule(Notified(self.get_new_task())); - - // Now that we have completed the call to schedule, we can - // release our ref-count. - self.drop_reference(); - } - TransitionToNotifiedByVal::Dealloc => { - self.dealloc(); - } - TransitionToNotifiedByVal::DoNothing => {} - } - } - - /// This call notifies the task. It will not consume any ref-counts, but the - /// caller should hold a ref-count. This will create a new Notified and - /// submit it if necessary. - pub(super) fn wake_by_ref(&self) { - use super::state::TransitionToNotifiedByRef; - - match self.state().transition_to_notified_by_ref() { - TransitionToNotifiedByRef::Submit => { - // The transition above incremented the ref-count for a new task - // and the caller also holds a ref-count. The caller's ref-count - // ensures that the task is not destroyed even if the new task - // is dropped before `schedule` returns. - self.core() - .scheduler - .schedule(Notified(self.get_new_task())); - } - TransitionToNotifiedByRef::DoNothing => {} - } - } - - pub(super) fn drop_reference(self) { - if self.state().ref_dec() { - self.dealloc(); - } - } - - #[cfg(all(tokio_unstable, feature = "tracing"))] - pub(super) fn id(&self) -> Option<&tracing::Id> { - self.header().id.as_ref() - } - // ====== internal ====== /// Completes the task. This method assumes that the state is RUNNING. diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index 21ef2f1ba8b..5660575504e 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -1,4 +1,4 @@ -use crate::runtime::task::{Id, RawTask}; +use crate::runtime::task::{Header, RawTask}; use std::fmt; use std::future::Future; @@ -154,8 +154,7 @@ cfg_rt! { /// [`std::thread::JoinHandle`]: std::thread::JoinHandle /// [`JoinError`]: crate::task::JoinError pub struct JoinHandle { - raw: Option, - id: Id, + raw: RawTask, _p: PhantomData, } } @@ -167,10 +166,9 @@ impl UnwindSafe for JoinHandle {} impl RefUnwindSafe for JoinHandle {} impl JoinHandle { - pub(super) fn new(raw: RawTask, id: Id) -> JoinHandle { + pub(super) fn new(raw: RawTask) -> JoinHandle { JoinHandle { - raw: Some(raw), - id, + raw, _p: PhantomData, } } @@ -209,9 +207,7 @@ impl JoinHandle { /// ``` /// [cancelled]: method@super::error::JoinError::is_cancelled pub fn abort(&self) { - if let Some(raw) = self.raw { - raw.remote_abort(); - } + self.raw.remote_abort(); } /// Checks if the task associated with this `JoinHandle` has finished. @@ -243,31 +239,22 @@ impl JoinHandle { /// ``` /// [`abort`]: method@JoinHandle::abort pub fn is_finished(&self) -> bool { - if let Some(raw) = self.raw { - let state = raw.header().state.load(); - state.is_complete() - } else { - true - } + let state = self.raw.header().state.load(); + state.is_complete() } /// Set the waker that is notified when the task completes. pub(crate) fn set_join_waker(&mut self, waker: &Waker) { - if let Some(raw) = self.raw { - if raw.try_set_join_waker(waker) { - // In this case the task has already completed. We wake the waker immediately. - waker.wake_by_ref(); - } + if self.raw.try_set_join_waker(waker) { + // In this case the task has already completed. We wake the waker immediately. + waker.wake_by_ref(); } } /// Returns a new `AbortHandle` that can be used to remotely abort this task. pub(crate) fn abort_handle(&self) -> super::AbortHandle { - let raw = self.raw.map(|raw| { - raw.ref_inc(); - raw - }); - super::AbortHandle::new(raw, self.id) + self.raw.ref_inc(); + super::AbortHandle::new(self.raw) } /// Returns a [task ID] that uniquely identifies this task relative to other @@ -282,7 +269,8 @@ impl JoinHandle { #[cfg(tokio_unstable)] #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] pub fn id(&self) -> super::Id { - self.id + // Safety: The header pointer is valid. + unsafe { Header::get_id(self.raw.header_ptr()) } } } @@ -297,13 +285,6 @@ impl Future for JoinHandle { // Keep track of task budget let coop = ready!(crate::runtime::coop::poll_proceed(cx)); - // Raw should always be set. If it is not, this is due to polling after - // completion - let raw = self - .raw - .as_ref() - .expect("polling after `JoinHandle` already completed"); - // Try to read the task output. If the task is not yet complete, the // waker is stored and is notified once the task does complete. // @@ -316,7 +297,8 @@ impl Future for JoinHandle { // // The type of `T` must match the task's output type. unsafe { - raw.try_read_output(&mut ret as *mut _ as *mut (), cx.waker()); + self.raw + .try_read_output(&mut ret as *mut _ as *mut (), cx.waker()); } if ret.is_ready() { @@ -329,13 +311,11 @@ impl Future for JoinHandle { impl Drop for JoinHandle { fn drop(&mut self) { - if let Some(raw) = self.raw.take() { - if raw.header().state.drop_join_handle_fast().is_ok() { - return; - } - - raw.drop_join_handle_slow(); + if self.raw.state().drop_join_handle_fast().is_ok() { + return; } + + self.raw.drop_join_handle_slow(); } } @@ -344,8 +324,9 @@ where T: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("JoinHandle") - .field("id", &self.id) - .finish() + // Safety: The header pointer is valid. + let id_ptr = unsafe { Header::get_id_ptr(self.raw.header_ptr()) }; + let id = unsafe { id_ptr.as_ref() }; + fmt.debug_struct("JoinHandle").field("id", id).finish() } } diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index c31b110a476..06192b567d1 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -338,7 +338,7 @@ cfg_rt! { raw, _p: PhantomData, }); - let join = JoinHandle::new(raw, id); + let join = JoinHandle::new(raw); (task, notified, join) } diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index a24ac44bf6f..b9700ae5ef5 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -14,45 +14,47 @@ pub(super) struct Vtable { /// Polls the future. pub(super) poll: unsafe fn(NonNull
), + /// Schedules the task for execution on the runtime. + pub(super) schedule: unsafe fn(NonNull
), + /// Deallocates the memory. pub(super) dealloc: unsafe fn(NonNull
), /// Reads the task output, if complete. pub(super) try_read_output: unsafe fn(NonNull
, *mut (), &Waker), - /// Try to set the waker notified when the task is complete. Returns true if - /// the task has already completed. If this call returns false, then the - /// waker will not be notified. - pub(super) try_set_join_waker: unsafe fn(NonNull
, &Waker) -> bool, - /// The join handle has been dropped. pub(super) drop_join_handle_slow: unsafe fn(NonNull
), /// An abort handle has been dropped. pub(super) drop_abort_handle: unsafe fn(NonNull
), - /// The task is remotely aborted. - pub(super) remote_abort: unsafe fn(NonNull
), - /// Scheduler is being shutdown. pub(super) shutdown: unsafe fn(NonNull
), /// The number of bytes that the `trailer` field is offset from the header. pub(super) trailer_offset: usize, + + /// The number of bytes that the `scheduler` field is offset from the header. + pub(super) scheduler_offset: usize, + + /// The number of bytes that the `id` field is offset from the header. + pub(super) id_offset: usize, } /// Get the vtable for the requested `T` and `S` generics. pub(super) fn vtable() -> &'static Vtable { &Vtable { poll: poll::, + schedule: schedule::, dealloc: dealloc::, try_read_output: try_read_output::, - try_set_join_waker: try_set_join_waker::, drop_join_handle_slow: drop_join_handle_slow::, drop_abort_handle: drop_abort_handle::, - remote_abort: remote_abort::, shutdown: shutdown::, - trailer_offset: TrailerOffsetHelper::::OFFSET, + trailer_offset: OffsetHelper::::TRAILER_OFFSET, + scheduler_offset: OffsetHelper::::SCHEDULER_OFFSET, + id_offset: OffsetHelper::::ID_OFFSET, } } @@ -61,17 +63,31 @@ pub(super) fn vtable() -> &'static Vtable { /// /// See this thread for more info: /// -struct TrailerOffsetHelper(T, S); -impl TrailerOffsetHelper { +struct OffsetHelper(T, S); +impl OffsetHelper { // Pass `size_of`/`align_of` as arguments rather than calling them directly // inside `get_trailer_offset` because trait bounds on generic parameters // of const fn are unstable on our MSRV. - const OFFSET: usize = get_trailer_offset( + const TRAILER_OFFSET: usize = get_trailer_offset( std::mem::size_of::
(), std::mem::size_of::>(), std::mem::align_of::>(), std::mem::align_of::(), ); + + // The `scheduler` is the first field of `Core`, so it has the same + // offset as `Core`. + const SCHEDULER_OFFSET: usize = get_core_offset( + std::mem::size_of::
(), + std::mem::align_of::>(), + ); + + const ID_OFFSET: usize = get_id_offset( + std::mem::size_of::
(), + std::mem::align_of::>(), + std::mem::size_of::(), + std::mem::align_of::(), + ); } /// Compute the offset of the `Trailer` field in `Cell` using the @@ -101,6 +117,44 @@ const fn get_trailer_offset( offset } +/// Compute the offset of the `Core` field in `Cell` using the +/// `#[repr(C)]` algorithm. +/// +/// Pseudo-code for the `#[repr(C)]` algorithm can be found here: +/// +const fn get_core_offset(header_size: usize, core_align: usize) -> usize { + let mut offset = header_size; + + let core_misalign = offset % core_align; + if core_misalign > 0 { + offset += core_align - core_misalign; + } + + offset +} + +/// Compute the offset of the `Id` field in `Cell` using the +/// `#[repr(C)]` algorithm. +/// +/// Pseudo-code for the `#[repr(C)]` algorithm can be found here: +/// +const fn get_id_offset( + header_size: usize, + core_align: usize, + scheduler_size: usize, + id_align: usize, +) -> usize { + let mut offset = get_core_offset(header_size, core_align); + offset += scheduler_size; + + let id_misalign = offset % id_align; + if id_misalign > 0 { + offset += id_align - id_misalign; + } + + offset +} + impl RawTask { pub(super) fn new(task: T, scheduler: S, id: Id) -> RawTask where @@ -121,19 +175,36 @@ impl RawTask { self.ptr } - /// Returns a reference to the task's meta structure. - /// - /// Safe as `Header` is `Sync`. + pub(super) fn trailer_ptr(&self) -> NonNull { + unsafe { Header::get_trailer(self.ptr) } + } + + /// Returns a reference to the task's header. pub(super) fn header(&self) -> &Header { unsafe { self.ptr.as_ref() } } + /// Returns a reference to the task's trailer. + pub(super) fn trailer(&self) -> &Trailer { + unsafe { &*self.trailer_ptr().as_ptr() } + } + + /// Returns a reference to the task's state. + pub(super) fn state(&self) -> &State { + &self.header().state + } + /// Safety: mutual exclusion is required to call this function. pub(super) fn poll(self) { let vtable = self.header().vtable; unsafe { (vtable.poll)(self.ptr) } } + pub(super) fn schedule(self) { + let vtable = self.header().vtable; + unsafe { (vtable.schedule)(self.ptr) } + } + pub(super) fn dealloc(self) { let vtable = self.header().vtable; unsafe { @@ -148,11 +219,6 @@ impl RawTask { (vtable.try_read_output)(self.ptr, dst, waker); } - pub(super) fn try_set_join_waker(self, waker: &Waker) -> bool { - let vtable = self.header().vtable; - unsafe { (vtable.try_set_join_waker)(self.ptr, waker) } - } - pub(super) fn drop_join_handle_slow(self) { let vtable = self.header().vtable; unsafe { (vtable.drop_join_handle_slow)(self.ptr) } @@ -168,11 +234,6 @@ impl RawTask { unsafe { (vtable.shutdown)(self.ptr) } } - pub(super) fn remote_abort(self) { - let vtable = self.header().vtable; - unsafe { (vtable.remote_abort)(self.ptr) } - } - /// Increment the task's reference count. /// /// Currently, this is used only when creating an `AbortHandle`. @@ -194,6 +255,15 @@ unsafe fn poll(ptr: NonNull
) { harness.poll(); } +unsafe fn schedule(ptr: NonNull
) { + use crate::runtime::task::{Notified, Task}; + + let scheduler = Header::get_scheduler::(ptr); + scheduler + .as_ref() + .schedule(Notified(Task::from_raw(ptr.cast()))); +} + unsafe fn dealloc(ptr: NonNull
) { let harness = Harness::::from_raw(ptr); harness.dealloc(); @@ -210,11 +280,6 @@ unsafe fn try_read_output( harness.try_read_output(out, waker); } -unsafe fn try_set_join_waker(ptr: NonNull
, waker: &Waker) -> bool { - let harness = Harness::::from_raw(ptr); - harness.try_set_join_waker(waker) -} - unsafe fn drop_join_handle_slow(ptr: NonNull
) { let harness = Harness::::from_raw(ptr); harness.drop_join_handle_slow() @@ -225,11 +290,6 @@ unsafe fn drop_abort_handle(ptr: NonNull
) { harness.drop_reference(); } -unsafe fn remote_abort(ptr: NonNull
) { - let harness = Harness::::from_raw(ptr); - harness.remote_abort() -} - unsafe fn shutdown(ptr: NonNull
) { let harness = Harness::::from_raw(ptr); harness.shutdown() diff --git a/tokio/src/runtime/task/waker.rs b/tokio/src/runtime/task/waker.rs index a434d5be481..b5f5ace9ece 100644 --- a/tokio/src/runtime/task/waker.rs +++ b/tokio/src/runtime/task/waker.rs @@ -1,6 +1,5 @@ use crate::future::Future; -use crate::runtime::task::harness::Harness; -use crate::runtime::task::{Header, Schedule}; +use crate::runtime::task::{Header, RawTask, Schedule}; use std::marker::PhantomData; use std::mem::ManuallyDrop; @@ -28,7 +27,7 @@ where // point and not an *owned* waker, we must ensure that `drop` is never // called on this waker instance. This is done by wrapping it with // `ManuallyDrop` and then never calling drop. - let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::(*header))) }; + let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker(*header))) }; WakerRef { waker, @@ -46,8 +45,8 @@ impl ops::Deref for WakerRef<'_, S> { cfg_trace! { macro_rules! trace { - ($harness:expr, $op:expr) => { - if let Some(id) = $harness.id() { + ($header:expr, $op:expr) => { + if let Some(id) = Header::get_tracing_id(&$header) { tracing::trace!( target: "tokio::task::waker", op = $op, @@ -60,71 +59,46 @@ cfg_trace! { cfg_not_trace! { macro_rules! trace { - ($harness:expr, $op:expr) => { + ($header:expr, $op:expr) => { // noop - let _ = &$harness; + let _ = &$header; } } } -unsafe fn clone_waker(ptr: *const ()) -> RawWaker -where - T: Future, - S: Schedule, -{ - let header = ptr as *const Header; - let ptr = NonNull::new_unchecked(ptr as *mut Header); - let harness = Harness::::from_raw(ptr); - trace!(harness, "waker.clone"); - (*header).state.ref_inc(); - raw_waker::(ptr) +unsafe fn clone_waker(ptr: *const ()) -> RawWaker { + let header = NonNull::new_unchecked(ptr as *mut Header); + trace!(header, "waker.clone"); + header.as_ref().state.ref_inc(); + raw_waker(header) } -unsafe fn drop_waker(ptr: *const ()) -where - T: Future, - S: Schedule, -{ +unsafe fn drop_waker(ptr: *const ()) { let ptr = NonNull::new_unchecked(ptr as *mut Header); - let harness = Harness::::from_raw(ptr); - trace!(harness, "waker.drop"); - harness.drop_reference(); + trace!(ptr, "waker.drop"); + let raw = RawTask::from_raw(ptr); + raw.drop_reference(); } -unsafe fn wake_by_val(ptr: *const ()) -where - T: Future, - S: Schedule, -{ +unsafe fn wake_by_val(ptr: *const ()) { let ptr = NonNull::new_unchecked(ptr as *mut Header); - let harness = Harness::::from_raw(ptr); - trace!(harness, "waker.wake"); - harness.wake_by_val(); + trace!(ptr, "waker.wake"); + let raw = RawTask::from_raw(ptr); + raw.wake_by_val(); } // Wake without consuming the waker -unsafe fn wake_by_ref(ptr: *const ()) -where - T: Future, - S: Schedule, -{ +unsafe fn wake_by_ref(ptr: *const ()) { let ptr = NonNull::new_unchecked(ptr as *mut Header); - let harness = Harness::::from_raw(ptr); - trace!(harness, "waker.wake_by_ref"); - harness.wake_by_ref(); + trace!(ptr, "waker.wake_by_ref"); + let raw = RawTask::from_raw(ptr); + raw.wake_by_ref(); } -fn raw_waker(header: NonNull
) -> RawWaker -where - T: Future, - S: Schedule, -{ +static WAKER_VTABLE: RawWakerVTable = + RawWakerVTable::new(clone_waker, wake_by_val, wake_by_ref, drop_waker); + +fn raw_waker(header: NonNull
) -> RawWaker { let ptr = header.as_ptr() as *const (); - let vtable = &RawWakerVTable::new( - clone_waker::, - wake_by_val::, - wake_by_ref::, - drop_waker::, - ); - RawWaker::new(ptr, vtable) + RawWaker::new(ptr, &WAKER_VTABLE) }