Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move inject queue to tokio::runtime::task #3939

Merged
merged 6 commits into from Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
282 changes: 56 additions & 226 deletions tokio/src/runtime/queue.rs
@@ -1,13 +1,12 @@
//! Run-queue structures to support a work-stealing scheduler

use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicU16, AtomicU32, AtomicUsize};
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::task;
use crate::loom::sync::atomic::{AtomicU16, AtomicU32};
use crate::loom::sync::Arc;
use crate::runtime::task::{self, Inject};

use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr::{self, NonNull};
use std::ptr;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};

/// Producer handle. May only be used from a single thread.
Expand All @@ -18,19 +17,6 @@ pub(super) struct Local<T: 'static> {
/// Consumer handle. May be used from many threads.
pub(super) struct Steal<T: 'static>(Arc<Inner<T>>);

/// Growable, MPMC queue used to inject new tasks into the scheduler and as an
/// overflow queue when the local, fixed-size, array queue overflows.
pub(super) struct Inject<T: 'static> {
/// Pointers to the head and tail of the queue
pointers: Mutex<Pointers>,

/// Number of pending tasks in the queue. This helps prevent unnecessary
/// locking in the hot path.
len: AtomicUsize,

_p: PhantomData<T>,
}

pub(super) struct Inner<T: 'static> {
/// Concurrently updated by many threads.
///
Expand All @@ -49,24 +35,11 @@ pub(super) struct Inner<T: 'static> {
tail: AtomicU16,

/// Elements
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>]>,
}

struct Pointers {
/// True if the queue is closed
is_closed: bool,

/// Linked-list head
head: Option<NonNull<task::Header>>,

/// Linked-list tail
tail: Option<NonNull<task::Header>>,
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess the other question re https://github.com/tokio-rs/tokio/pull/3939/files#r667212885 is, does this even need to be boxed anymore, if it has a fixed size? should we save another layer of indirection by removing the Box, or is there a reason for this to live in a separate allocation beyond sizedness?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to, no. If I am to come up with a reason, then some allocators probably dislike allocations that are slightly larger than 1024 bytes. I don't know what effect this has in practice. I am open to removing the box.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess another reason is that long fixed size arrays are really awkward to construct for non-copy generic types.

}

unsafe impl<T> Send for Inner<T> {}
unsafe impl<T> Sync for Inner<T> {}
unsafe impl<T> Send for Inject<T> {}
unsafe impl<T> Sync for Inject<T> {}

#[cfg(not(loom))]
const LOCAL_QUEUE_CAPACITY: usize = 256;
Expand All @@ -79,6 +52,13 @@ const LOCAL_QUEUE_CAPACITY: usize = 4;

const MASK: usize = LOCAL_QUEUE_CAPACITY - 1;

fn make_fixed_size<T>(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> {
assert_eq!(buffer.len(), LOCAL_QUEUE_CAPACITY);

// SAFETY: We check that the length is correct.
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
unsafe { Box::from_raw(Box::into_raw(buffer).cast()) }
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really necessary? is there a reason we can't just replace the Vec::new with a boxed array literal? if we can't do that, it would be nice to have a comment explaining why, IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the alternative:

Box::new([
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
    UnsafeCell::new(MaybeUninit::new()),
])

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, um. on second thought let's maybe not do that.


/// Create a new local run-queue
pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY);
Expand All @@ -90,7 +70,7 @@ pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
let inner = Arc::new(Inner {
head: AtomicU32::new(0),
tail: AtomicU16::new(0),
buffer: buffer.into(),
buffer: make_fixed_size(buffer.into_boxed_slice()),
});

let local = Local {
Expand All @@ -109,10 +89,7 @@ impl<T> Local<T> {
}

/// Pushes a task to the back of the local queue, skipping the LIFO slot.
pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>)
where
T: crate::runtime::task::Schedule,
{
pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) {
let tail = loop {
let head = self.inner.head.load(Acquire);
let (steal, real) = unpack(head);
Expand Down Expand Up @@ -179,9 +156,12 @@ impl<T> Local<T> {
tail: u16,
inject: &Inject<T>,
) -> Result<(), task::Notified<T>> {
const BATCH_LEN: usize = LOCAL_QUEUE_CAPACITY / 2 + 1;
/// How many elements are we taking from the local queue.
///
/// This is one less than the number of tasks pushed to the inject
/// queue as we are also inserting the `task` argument.
const NUM_TASKS_TAKEN: u16 = (LOCAL_QUEUE_CAPACITY / 2) as u16;

let n = (LOCAL_QUEUE_CAPACITY / 2) as u16;
assert_eq!(
tail.wrapping_sub(head) as usize,
LOCAL_QUEUE_CAPACITY,
Expand All @@ -207,7 +187,10 @@ impl<T> Local<T> {
.head
.compare_exchange(
prev,
pack(head.wrapping_add(n), head.wrapping_add(n)),
pack(
head.wrapping_add(NUM_TASKS_TAKEN),
head.wrapping_add(NUM_TASKS_TAKEN),
),
Release,
Relaxed,
)
Expand All @@ -219,41 +202,41 @@ impl<T> Local<T> {
return Err(task);
}

// link the tasks
for i in 0..n {
let j = i + 1;

let i_idx = i.wrapping_add(head) as usize & MASK;
let j_idx = j.wrapping_add(head) as usize & MASK;

// Get the next pointer
let next = if j == n {
// The last task in the local queue being moved
task.header().into()
} else {
// safety: The above CAS prevents a stealer from accessing these
// tasks and we are the only producer.
self.inner.buffer[j_idx].with(|ptr| unsafe {
let value = (*ptr).as_ptr();
(*value).header().into()
})
};

// safety: the above CAS prevents a stealer from accessing these
// tasks and we are the only producer.
self.inner.buffer[i_idx].with_mut(|ptr| unsafe {
let ptr = (*ptr).as_ptr();
(*ptr).header().set_next(Some(next))
});
/// An iterator the takes elements out of the run queue.
struct BatchTaskIter<'a, T: 'static> {
buffer: &'a [UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY],
head: u32,
i: u32,
}
impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> {
type Item = task::Notified<T>;

#[inline]
fn next(&mut self) -> Option<task::Notified<T>> {
if self.i == u32::from(NUM_TASKS_TAKEN) {
hawkw marked this conversation as resolved.
Show resolved Hide resolved
None
} else {
let i_idx = self.i.wrapping_add(self.head) as usize & MASK;
let slot = &self.buffer[i_idx];

// safety: Our CAS from before has assumed exclusive ownership
// of the task pointers in this range.
let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });

self.i += 1;
Some(task)
}
}
}

// safety: the above CAS prevents a stealer from accessing these tasks
// and we are the only producer.
let head = self.inner.buffer[head as usize & MASK]
.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });

// Push the tasks onto the inject queue
inject.push_batch(head, task, BATCH_LEN);
// safety: The CAS above ensures that no consumer will look at these
// values again, and we are the only producer.
let batch_iter = BatchTaskIter {
buffer: &*self.inner.buffer,
head: head as u32,
i: 0,
};
inject.push_batch(batch_iter.chain(std::iter::once(task)));

Ok(())
}
Expand Down Expand Up @@ -473,159 +456,6 @@ impl<T> Inner<T> {
}
}

impl<T: 'static> Inject<T> {
pub(super) fn new() -> Inject<T> {
Inject {
pointers: Mutex::new(Pointers {
is_closed: false,
head: None,
tail: None,
}),
len: AtomicUsize::new(0),
_p: PhantomData,
}
}

pub(super) fn is_empty(&self) -> bool {
self.len() == 0
}

/// Close the injection queue, returns `true` if the queue is open when the
/// transition is made.
pub(super) fn close(&self) -> bool {
let mut p = self.pointers.lock();

if p.is_closed {
return false;
}

p.is_closed = true;
true
}

pub(super) fn is_closed(&self) -> bool {
self.pointers.lock().is_closed
}

pub(super) fn len(&self) -> usize {
self.len.load(Acquire)
}

/// Pushes a value into the queue.
///
/// Returns `Err(task)` if pushing fails due to the queue being shutdown.
/// The caller is expected to call `shutdown()` on the task **if and only
/// if** it is a newly spawned task.
pub(super) fn push(&self, task: task::Notified<T>) -> Result<(), task::Notified<T>>
where
T: crate::runtime::task::Schedule,
{
// Acquire queue lock
let mut p = self.pointers.lock();

if p.is_closed {
return Err(task);
}

// safety: only mutated with the lock held
let len = unsafe { self.len.unsync_load() };
let task = task.into_raw();

// The next pointer should already be null
debug_assert!(get_next(task).is_none());

if let Some(tail) = p.tail {
set_next(tail, Some(task));
} else {
p.head = Some(task);
}

p.tail = Some(task);

self.len.store(len + 1, Release);
Ok(())
}

pub(super) fn push_batch(
&self,
batch_head: task::Notified<T>,
batch_tail: task::Notified<T>,
num: usize,
) {
let batch_head = batch_head.into_raw();
let batch_tail = batch_tail.into_raw();

debug_assert!(get_next(batch_tail).is_none());

let mut p = self.pointers.lock();

if let Some(tail) = p.tail {
set_next(tail, Some(batch_head));
} else {
p.head = Some(batch_head);
}

p.tail = Some(batch_tail);

// Increment the count.
//
// safety: All updates to the len atomic are guarded by the mutex. As
// such, a non-atomic load followed by a store is safe.
let len = unsafe { self.len.unsync_load() };

self.len.store(len + num, Release);
}

pub(super) fn pop(&self) -> Option<task::Notified<T>> {
// Fast path, if len == 0, then there are no values
if self.is_empty() {
return None;
}

let mut p = self.pointers.lock();

// It is possible to hit null here if another thread popped the last
// task between us checking `len` and acquiring the lock.
let task = p.head?;

p.head = get_next(task);

if p.head.is_none() {
p.tail = None;
}

set_next(task, None);

// Decrement the count.
//
// safety: All updates to the len atomic are guarded by the mutex. As
// such, a non-atomic load followed by a store is safe.
self.len
.store(unsafe { self.len.unsync_load() } - 1, Release);

// safety: a `Notified` is pushed into the queue and now it is popped!
Some(unsafe { task::Notified::from_raw(task) })
}
}

impl<T: 'static> Drop for Inject<T> {
fn drop(&mut self) {
if !std::thread::panicking() {
assert!(self.pop().is_none(), "queue not empty");
}
}
}

fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> {
unsafe { header.as_ref().queue_next.with(|ptr| *ptr) }
}

fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) {
unsafe {
header.as_ref().set_next(val);
}
}

/// Split the head value into the real head and the index a stealer is working
/// on.
fn unpack(n: u32) -> (u16, u16) {
Expand Down