Skip to content

Commit

Permalink
runtime: move inject queue to tokio::runtime::task (#3939)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Jul 12, 2021
1 parent 127983e commit 3b38ebd
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 245 deletions.
286 changes: 60 additions & 226 deletions tokio/src/runtime/queue.rs
@@ -1,13 +1,12 @@
//! Run-queue structures to support a work-stealing scheduler

use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicU16, AtomicU32, AtomicUsize};
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::task;
use crate::loom::sync::atomic::{AtomicU16, AtomicU32};
use crate::loom::sync::Arc;
use crate::runtime::task::{self, Inject};

use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr::{self, NonNull};
use std::ptr;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};

/// Producer handle. May only be used from a single thread.
Expand All @@ -18,19 +17,6 @@ pub(super) struct Local<T: 'static> {
/// Consumer handle. May be used from many threads.
pub(super) struct Steal<T: 'static>(Arc<Inner<T>>);

/// Growable, MPMC queue used to inject new tasks into the scheduler and as an
/// overflow queue when the local, fixed-size, array queue overflows.
pub(super) struct Inject<T: 'static> {
/// Pointers to the head and tail of the queue
pointers: Mutex<Pointers>,

/// Number of pending tasks in the queue. This helps prevent unnecessary
/// locking in the hot path.
len: AtomicUsize,

_p: PhantomData<T>,
}

pub(super) struct Inner<T: 'static> {
/// Concurrently updated by many threads.
///
Expand All @@ -49,24 +35,11 @@ pub(super) struct Inner<T: 'static> {
tail: AtomicU16,

/// Elements
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>]>,
}

struct Pointers {
/// True if the queue is closed
is_closed: bool,

/// Linked-list head
head: Option<NonNull<task::Header>>,

/// Linked-list tail
tail: Option<NonNull<task::Header>>,
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>,
}

unsafe impl<T> Send for Inner<T> {}
unsafe impl<T> Sync for Inner<T> {}
unsafe impl<T> Send for Inject<T> {}
unsafe impl<T> Sync for Inject<T> {}

#[cfg(not(loom))]
const LOCAL_QUEUE_CAPACITY: usize = 256;
Expand All @@ -79,6 +52,17 @@ const LOCAL_QUEUE_CAPACITY: usize = 4;

const MASK: usize = LOCAL_QUEUE_CAPACITY - 1;

// Constructing the fixed size array directly is very awkward. The only way to
// do it is to repeat `UnsafeCell::new(MaybeUninit::uninit())` 256 times, as
// the contents are not Copy. The trick with defining a const doesn't work for
// generic types.
fn make_fixed_size<T>(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> {
assert_eq!(buffer.len(), LOCAL_QUEUE_CAPACITY);

// safety: We check that the length is correct.
unsafe { Box::from_raw(Box::into_raw(buffer).cast()) }
}

/// Create a new local run-queue
pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY);
Expand All @@ -90,7 +74,7 @@ pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
let inner = Arc::new(Inner {
head: AtomicU32::new(0),
tail: AtomicU16::new(0),
buffer: buffer.into(),
buffer: make_fixed_size(buffer.into_boxed_slice()),
});

let local = Local {
Expand All @@ -109,10 +93,7 @@ impl<T> Local<T> {
}

/// Pushes a task to the back of the local queue, skipping the LIFO slot.
pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>)
where
T: crate::runtime::task::Schedule,
{
pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) {
let tail = loop {
let head = self.inner.head.load(Acquire);
let (steal, real) = unpack(head);
Expand Down Expand Up @@ -179,9 +160,12 @@ impl<T> Local<T> {
tail: u16,
inject: &Inject<T>,
) -> Result<(), task::Notified<T>> {
const BATCH_LEN: usize = LOCAL_QUEUE_CAPACITY / 2 + 1;
/// How many elements are we taking from the local queue.
///
/// This is one less than the number of tasks pushed to the inject
/// queue as we are also inserting the `task` argument.
const NUM_TASKS_TAKEN: u16 = (LOCAL_QUEUE_CAPACITY / 2) as u16;

let n = (LOCAL_QUEUE_CAPACITY / 2) as u16;
assert_eq!(
tail.wrapping_sub(head) as usize,
LOCAL_QUEUE_CAPACITY,
Expand All @@ -207,7 +191,10 @@ impl<T> Local<T> {
.head
.compare_exchange(
prev,
pack(head.wrapping_add(n), head.wrapping_add(n)),
pack(
head.wrapping_add(NUM_TASKS_TAKEN),
head.wrapping_add(NUM_TASKS_TAKEN),
),
Release,
Relaxed,
)
Expand All @@ -219,41 +206,41 @@ impl<T> Local<T> {
return Err(task);
}

// link the tasks
for i in 0..n {
let j = i + 1;

let i_idx = i.wrapping_add(head) as usize & MASK;
let j_idx = j.wrapping_add(head) as usize & MASK;

// Get the next pointer
let next = if j == n {
// The last task in the local queue being moved
task.header().into()
} else {
// safety: The above CAS prevents a stealer from accessing these
// tasks and we are the only producer.
self.inner.buffer[j_idx].with(|ptr| unsafe {
let value = (*ptr).as_ptr();
(*value).header().into()
})
};

// safety: the above CAS prevents a stealer from accessing these
// tasks and we are the only producer.
self.inner.buffer[i_idx].with_mut(|ptr| unsafe {
let ptr = (*ptr).as_ptr();
(*ptr).header().set_next(Some(next))
});
/// An iterator that takes elements out of the run queue.
struct BatchTaskIter<'a, T: 'static> {
buffer: &'a [UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY],
head: u32,
i: u32,
}
impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> {
type Item = task::Notified<T>;

#[inline]
fn next(&mut self) -> Option<task::Notified<T>> {
if self.i == u32::from(NUM_TASKS_TAKEN) {
None
} else {
let i_idx = self.i.wrapping_add(self.head) as usize & MASK;
let slot = &self.buffer[i_idx];

// safety: Our CAS from before has assumed exclusive ownership
// of the task pointers in this range.
let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });

self.i += 1;
Some(task)
}
}
}

// safety: the above CAS prevents a stealer from accessing these tasks
// and we are the only producer.
let head = self.inner.buffer[head as usize & MASK]
.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });

// Push the tasks onto the inject queue
inject.push_batch(head, task, BATCH_LEN);
// safety: The CAS above ensures that no consumer will look at these
// values again, and we are the only producer.
let batch_iter = BatchTaskIter {
buffer: &*self.inner.buffer,
head: head as u32,
i: 0,
};
inject.push_batch(batch_iter.chain(std::iter::once(task)));

Ok(())
}
Expand Down Expand Up @@ -473,159 +460,6 @@ impl<T> Inner<T> {
}
}

impl<T: 'static> Inject<T> {
pub(super) fn new() -> Inject<T> {
Inject {
pointers: Mutex::new(Pointers {
is_closed: false,
head: None,
tail: None,
}),
len: AtomicUsize::new(0),
_p: PhantomData,
}
}

pub(super) fn is_empty(&self) -> bool {
self.len() == 0
}

/// Close the injection queue, returns `true` if the queue is open when the
/// transition is made.
pub(super) fn close(&self) -> bool {
let mut p = self.pointers.lock();

if p.is_closed {
return false;
}

p.is_closed = true;
true
}

pub(super) fn is_closed(&self) -> bool {
self.pointers.lock().is_closed
}

pub(super) fn len(&self) -> usize {
self.len.load(Acquire)
}

/// Pushes a value into the queue.
///
/// Returns `Err(task)` if pushing fails due to the queue being shutdown.
/// The caller is expected to call `shutdown()` on the task **if and only
/// if** it is a newly spawned task.
pub(super) fn push(&self, task: task::Notified<T>) -> Result<(), task::Notified<T>>
where
T: crate::runtime::task::Schedule,
{
// Acquire queue lock
let mut p = self.pointers.lock();

if p.is_closed {
return Err(task);
}

// safety: only mutated with the lock held
let len = unsafe { self.len.unsync_load() };
let task = task.into_raw();

// The next pointer should already be null
debug_assert!(get_next(task).is_none());

if let Some(tail) = p.tail {
set_next(tail, Some(task));
} else {
p.head = Some(task);
}

p.tail = Some(task);

self.len.store(len + 1, Release);
Ok(())
}

pub(super) fn push_batch(
&self,
batch_head: task::Notified<T>,
batch_tail: task::Notified<T>,
num: usize,
) {
let batch_head = batch_head.into_raw();
let batch_tail = batch_tail.into_raw();

debug_assert!(get_next(batch_tail).is_none());

let mut p = self.pointers.lock();

if let Some(tail) = p.tail {
set_next(tail, Some(batch_head));
} else {
p.head = Some(batch_head);
}

p.tail = Some(batch_tail);

// Increment the count.
//
// safety: All updates to the len atomic are guarded by the mutex. As
// such, a non-atomic load followed by a store is safe.
let len = unsafe { self.len.unsync_load() };

self.len.store(len + num, Release);
}

pub(super) fn pop(&self) -> Option<task::Notified<T>> {
// Fast path, if len == 0, then there are no values
if self.is_empty() {
return None;
}

let mut p = self.pointers.lock();

// It is possible to hit null here if another thread popped the last
// task between us checking `len` and acquiring the lock.
let task = p.head?;

p.head = get_next(task);

if p.head.is_none() {
p.tail = None;
}

set_next(task, None);

// Decrement the count.
//
// safety: All updates to the len atomic are guarded by the mutex. As
// such, a non-atomic load followed by a store is safe.
self.len
.store(unsafe { self.len.unsync_load() } - 1, Release);

// safety: a `Notified` is pushed into the queue and now it is popped!
Some(unsafe { task::Notified::from_raw(task) })
}
}

impl<T: 'static> Drop for Inject<T> {
fn drop(&mut self) {
if !std::thread::panicking() {
assert!(self.pop().is_none(), "queue not empty");
}
}
}

fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> {
unsafe { header.as_ref().queue_next.with(|ptr| *ptr) }
}

fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) {
unsafe {
header.as_ref().set_next(val);
}
}

/// Split the head value into the real head and the index a stealer is working
/// on.
fn unpack(n: u32) -> (u16, u16) {
Expand Down

0 comments on commit 3b38ebd

Please sign in to comment.