From 3b38ebd7f5d50611193f942c31c57262f263be33 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 12 Jul 2021 10:31:36 +0200 Subject: [PATCH] runtime: move inject queue to `tokio::runtime::task` (#3939) --- tokio/src/runtime/queue.rs | 286 +++++------------------- tokio/src/runtime/task/inject.rs | 221 ++++++++++++++++++ tokio/src/runtime/task/mod.rs | 9 +- tokio/src/runtime/tests/loom_queue.rs | 10 +- tokio/src/runtime/tests/queue.rs | 12 +- tokio/src/runtime/thread_pool/worker.rs | 8 +- 6 files changed, 301 insertions(+), 245 deletions(-) create mode 100644 tokio/src/runtime/task/inject.rs diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs index 818ef7bac17..6e91dfa2363 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -1,13 +1,12 @@ //! Run-queue structures to support a work-stealing scheduler use crate::loom::cell::UnsafeCell; -use crate::loom::sync::atomic::{AtomicU16, AtomicU32, AtomicUsize}; -use crate::loom::sync::{Arc, Mutex}; -use crate::runtime::task; +use crate::loom::sync::atomic::{AtomicU16, AtomicU32}; +use crate::loom::sync::Arc; +use crate::runtime::task::{self, Inject}; -use std::marker::PhantomData; use std::mem::MaybeUninit; -use std::ptr::{self, NonNull}; +use std::ptr; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; /// Producer handle. May only be used from a single thread. @@ -18,19 +17,6 @@ pub(super) struct Local { /// Consumer handle. May be used from many threads. pub(super) struct Steal(Arc>); -/// Growable, MPMC queue used to inject new tasks into the scheduler and as an -/// overflow queue when the local, fixed-size, array queue overflows. -pub(super) struct Inject { - /// Pointers to the head and tail of the queue - pointers: Mutex, - - /// Number of pending tasks in the queue. This helps prevent unnecessary - /// locking in the hot path. - len: AtomicUsize, - - _p: PhantomData, -} - pub(super) struct Inner { /// Concurrently updated by many threads. /// @@ -49,24 +35,11 @@ pub(super) struct Inner { tail: AtomicU16, /// Elements - buffer: Box<[UnsafeCell>>]>, -} - -struct Pointers { - /// True if the queue is closed - is_closed: bool, - - /// Linked-list head - head: Option>, - - /// Linked-list tail - tail: Option>, + buffer: Box<[UnsafeCell>>; LOCAL_QUEUE_CAPACITY]>, } unsafe impl Send for Inner {} unsafe impl Sync for Inner {} -unsafe impl Send for Inject {} -unsafe impl Sync for Inject {} #[cfg(not(loom))] const LOCAL_QUEUE_CAPACITY: usize = 256; @@ -79,6 +52,17 @@ const LOCAL_QUEUE_CAPACITY: usize = 4; const MASK: usize = LOCAL_QUEUE_CAPACITY - 1; +// Constructing the fixed size array directly is very awkward. The only way to +// do it is to repeat `UnsafeCell::new(MaybeUninit::uninit())` 256 times, as +// the contents are not Copy. The trick with defining a const doesn't work for +// generic types. +fn make_fixed_size(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> { + assert_eq!(buffer.len(), LOCAL_QUEUE_CAPACITY); + + // safety: We check that the length is correct. + unsafe { Box::from_raw(Box::into_raw(buffer).cast()) } +} + /// Create a new local run-queue pub(super) fn local() -> (Steal, Local) { let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY); @@ -90,7 +74,7 @@ pub(super) fn local() -> (Steal, Local) { let inner = Arc::new(Inner { head: AtomicU32::new(0), tail: AtomicU16::new(0), - buffer: buffer.into(), + buffer: make_fixed_size(buffer.into_boxed_slice()), }); let local = Local { @@ -109,10 +93,7 @@ impl Local { } /// Pushes a task to the back of the local queue, skipping the LIFO slot. - pub(super) fn push_back(&mut self, mut task: task::Notified, inject: &Inject) - where - T: crate::runtime::task::Schedule, - { + pub(super) fn push_back(&mut self, mut task: task::Notified, inject: &Inject) { let tail = loop { let head = self.inner.head.load(Acquire); let (steal, real) = unpack(head); @@ -179,9 +160,12 @@ impl Local { tail: u16, inject: &Inject, ) -> Result<(), task::Notified> { - const BATCH_LEN: usize = LOCAL_QUEUE_CAPACITY / 2 + 1; + /// How many elements are we taking from the local queue. + /// + /// This is one less than the number of tasks pushed to the inject + /// queue as we are also inserting the `task` argument. + const NUM_TASKS_TAKEN: u16 = (LOCAL_QUEUE_CAPACITY / 2) as u16; - let n = (LOCAL_QUEUE_CAPACITY / 2) as u16; assert_eq!( tail.wrapping_sub(head) as usize, LOCAL_QUEUE_CAPACITY, @@ -207,7 +191,10 @@ impl Local { .head .compare_exchange( prev, - pack(head.wrapping_add(n), head.wrapping_add(n)), + pack( + head.wrapping_add(NUM_TASKS_TAKEN), + head.wrapping_add(NUM_TASKS_TAKEN), + ), Release, Relaxed, ) @@ -219,41 +206,41 @@ impl Local { return Err(task); } - // link the tasks - for i in 0..n { - let j = i + 1; - - let i_idx = i.wrapping_add(head) as usize & MASK; - let j_idx = j.wrapping_add(head) as usize & MASK; - - // Get the next pointer - let next = if j == n { - // The last task in the local queue being moved - task.header().into() - } else { - // safety: The above CAS prevents a stealer from accessing these - // tasks and we are the only producer. - self.inner.buffer[j_idx].with(|ptr| unsafe { - let value = (*ptr).as_ptr(); - (*value).header().into() - }) - }; - - // safety: the above CAS prevents a stealer from accessing these - // tasks and we are the only producer. - self.inner.buffer[i_idx].with_mut(|ptr| unsafe { - let ptr = (*ptr).as_ptr(); - (*ptr).header().set_next(Some(next)) - }); + /// An iterator that takes elements out of the run queue. + struct BatchTaskIter<'a, T: 'static> { + buffer: &'a [UnsafeCell>>; LOCAL_QUEUE_CAPACITY], + head: u32, + i: u32, + } + impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> { + type Item = task::Notified; + + #[inline] + fn next(&mut self) -> Option> { + if self.i == u32::from(NUM_TASKS_TAKEN) { + None + } else { + let i_idx = self.i.wrapping_add(self.head) as usize & MASK; + let slot = &self.buffer[i_idx]; + + // safety: Our CAS from before has assumed exclusive ownership + // of the task pointers in this range. + let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); + + self.i += 1; + Some(task) + } + } } - // safety: the above CAS prevents a stealer from accessing these tasks - // and we are the only producer. - let head = self.inner.buffer[head as usize & MASK] - .with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); - - // Push the tasks onto the inject queue - inject.push_batch(head, task, BATCH_LEN); + // safety: The CAS above ensures that no consumer will look at these + // values again, and we are the only producer. + let batch_iter = BatchTaskIter { + buffer: &*self.inner.buffer, + head: head as u32, + i: 0, + }; + inject.push_batch(batch_iter.chain(std::iter::once(task))); Ok(()) } @@ -473,159 +460,6 @@ impl Inner { } } -impl Inject { - pub(super) fn new() -> Inject { - Inject { - pointers: Mutex::new(Pointers { - is_closed: false, - head: None, - tail: None, - }), - len: AtomicUsize::new(0), - _p: PhantomData, - } - } - - pub(super) fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Close the injection queue, returns `true` if the queue is open when the - /// transition is made. - pub(super) fn close(&self) -> bool { - let mut p = self.pointers.lock(); - - if p.is_closed { - return false; - } - - p.is_closed = true; - true - } - - pub(super) fn is_closed(&self) -> bool { - self.pointers.lock().is_closed - } - - pub(super) fn len(&self) -> usize { - self.len.load(Acquire) - } - - /// Pushes a value into the queue. - /// - /// Returns `Err(task)` if pushing fails due to the queue being shutdown. - /// The caller is expected to call `shutdown()` on the task **if and only - /// if** it is a newly spawned task. - pub(super) fn push(&self, task: task::Notified) -> Result<(), task::Notified> - where - T: crate::runtime::task::Schedule, - { - // Acquire queue lock - let mut p = self.pointers.lock(); - - if p.is_closed { - return Err(task); - } - - // safety: only mutated with the lock held - let len = unsafe { self.len.unsync_load() }; - let task = task.into_raw(); - - // The next pointer should already be null - debug_assert!(get_next(task).is_none()); - - if let Some(tail) = p.tail { - set_next(tail, Some(task)); - } else { - p.head = Some(task); - } - - p.tail = Some(task); - - self.len.store(len + 1, Release); - Ok(()) - } - - pub(super) fn push_batch( - &self, - batch_head: task::Notified, - batch_tail: task::Notified, - num: usize, - ) { - let batch_head = batch_head.into_raw(); - let batch_tail = batch_tail.into_raw(); - - debug_assert!(get_next(batch_tail).is_none()); - - let mut p = self.pointers.lock(); - - if let Some(tail) = p.tail { - set_next(tail, Some(batch_head)); - } else { - p.head = Some(batch_head); - } - - p.tail = Some(batch_tail); - - // Increment the count. - // - // safety: All updates to the len atomic are guarded by the mutex. As - // such, a non-atomic load followed by a store is safe. - let len = unsafe { self.len.unsync_load() }; - - self.len.store(len + num, Release); - } - - pub(super) fn pop(&self) -> Option> { - // Fast path, if len == 0, then there are no values - if self.is_empty() { - return None; - } - - let mut p = self.pointers.lock(); - - // It is possible to hit null here if another thread popped the last - // task between us checking `len` and acquiring the lock. - let task = p.head?; - - p.head = get_next(task); - - if p.head.is_none() { - p.tail = None; - } - - set_next(task, None); - - // Decrement the count. - // - // safety: All updates to the len atomic are guarded by the mutex. As - // such, a non-atomic load followed by a store is safe. - self.len - .store(unsafe { self.len.unsync_load() } - 1, Release); - - // safety: a `Notified` is pushed into the queue and now it is popped! - Some(unsafe { task::Notified::from_raw(task) }) - } -} - -impl Drop for Inject { - fn drop(&mut self) { - if !std::thread::panicking() { - assert!(self.pop().is_none(), "queue not empty"); - } - } -} - -fn get_next(header: NonNull) -> Option> { - unsafe { header.as_ref().queue_next.with(|ptr| *ptr) } -} - -fn set_next(header: NonNull, val: Option>) { - unsafe { - header.as_ref().set_next(val); - } -} - /// Split the head value into the real head and the index a stealer is working /// on. fn unpack(n: u32) -> (u16, u16) { diff --git a/tokio/src/runtime/task/inject.rs b/tokio/src/runtime/task/inject.rs new file mode 100644 index 00000000000..8ca3187a722 --- /dev/null +++ b/tokio/src/runtime/task/inject.rs @@ -0,0 +1,221 @@ +//! Inject queue used to send wakeups to a work-stealing scheduler + +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Mutex; +use crate::runtime::task; + +use std::marker::PhantomData; +use std::ptr::NonNull; +use std::sync::atomic::Ordering::{Acquire, Release}; + +/// Growable, MPMC queue used to inject new tasks into the scheduler and as an +/// overflow queue when the local, fixed-size, array queue overflows. +pub(crate) struct Inject { + /// Pointers to the head and tail of the queue + pointers: Mutex, + + /// Number of pending tasks in the queue. This helps prevent unnecessary + /// locking in the hot path. + len: AtomicUsize, + + _p: PhantomData, +} + +struct Pointers { + /// True if the queue is closed + is_closed: bool, + + /// Linked-list head + head: Option>, + + /// Linked-list tail + tail: Option>, +} + +unsafe impl Send for Inject {} +unsafe impl Sync for Inject {} + +impl Inject { + pub(crate) fn new() -> Inject { + Inject { + pointers: Mutex::new(Pointers { + is_closed: false, + head: None, + tail: None, + }), + len: AtomicUsize::new(0), + _p: PhantomData, + } + } + + pub(crate) fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Close the injection queue, returns `true` if the queue is open when the + /// transition is made. + pub(crate) fn close(&self) -> bool { + let mut p = self.pointers.lock(); + + if p.is_closed { + return false; + } + + p.is_closed = true; + true + } + + pub(crate) fn is_closed(&self) -> bool { + self.pointers.lock().is_closed + } + + pub(crate) fn len(&self) -> usize { + self.len.load(Acquire) + } + + /// Pushes a value into the queue. + /// + /// Returns `Err(task)` if pushing fails due to the queue being shutdown. + /// The caller is expected to call `shutdown()` on the task **if and only + /// if** it is a newly spawned task. + pub(crate) fn push(&self, task: task::Notified) -> Result<(), task::Notified> { + // Acquire queue lock + let mut p = self.pointers.lock(); + + if p.is_closed { + return Err(task); + } + + // safety: only mutated with the lock held + let len = unsafe { self.len.unsync_load() }; + let task = task.into_raw(); + + // The next pointer should already be null + debug_assert!(get_next(task).is_none()); + + if let Some(tail) = p.tail { + set_next(tail, Some(task)); + } else { + p.head = Some(task); + } + + p.tail = Some(task); + + self.len.store(len + 1, Release); + Ok(()) + } + + /// Pushes several values into the queue. + /// + /// SAFETY: The caller should ensure that we have exclusive access to the + /// `queue_next` field in the provided tasks. + #[inline] + pub(crate) fn push_batch(&self, mut iter: I) + where + I: Iterator>, + { + let first = match iter.next() { + Some(first) => first.into_raw(), + None => return, + }; + + // Link up all the tasks. + let mut prev = first; + let mut counter = 1; + + // We are going to be called with an `std::iter::Chain`, and that + // iterator overrides `for_each` to something that is easier for the + // compiler to optimize than a loop. + iter.map(|next| next.into_raw()).for_each(|next| { + // safety: The caller guarantees exclusive access to this field. + set_next(prev, Some(next)); + prev = next; + counter += 1; + }); + + // Now that the tasks are linked together, insert them into the + // linked list. + self.push_batch_inner(first, prev, counter); + } + + /// Insert several tasks that have been linked together into the queue. + /// + /// The provided head and tail may be be the same task. In this case, a + /// single task is inserted. + #[inline] + fn push_batch_inner( + &self, + batch_head: NonNull, + batch_tail: NonNull, + num: usize, + ) { + debug_assert!(get_next(batch_tail).is_none()); + + let mut p = self.pointers.lock(); + + if let Some(tail) = p.tail { + set_next(tail, Some(batch_head)); + } else { + p.head = Some(batch_head); + } + + p.tail = Some(batch_tail); + + // Increment the count. + // + // safety: All updates to the len atomic are guarded by the mutex. As + // such, a non-atomic load followed by a store is safe. + let len = unsafe { self.len.unsync_load() }; + + self.len.store(len + num, Release); + } + + pub(crate) fn pop(&self) -> Option> { + // Fast path, if len == 0, then there are no values + if self.is_empty() { + return None; + } + + let mut p = self.pointers.lock(); + + // It is possible to hit null here if another thread popped the last + // task between us checking `len` and acquiring the lock. + let task = p.head?; + + p.head = get_next(task); + + if p.head.is_none() { + p.tail = None; + } + + set_next(task, None); + + // Decrement the count. + // + // safety: All updates to the len atomic are guarded by the mutex. As + // such, a non-atomic load followed by a store is safe. + self.len + .store(unsafe { self.len.unsync_load() } - 1, Release); + + // safety: a `Notified` is pushed into the queue and now it is popped! + Some(unsafe { task::Notified::from_raw(task) }) + } +} + +impl Drop for Inject { + fn drop(&mut self) { + if !std::thread::panicking() { + assert!(self.pop().is_none(), "queue not empty"); + } + } +} + +fn get_next(header: NonNull) -> Option> { + unsafe { header.as_ref().queue_next.with(|ptr| *ptr) } +} + +fn set_next(header: NonNull, val: Option>) { + unsafe { + header.as_ref().set_next(val); + } +} diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 6b1b8c63886..a4d4146fe25 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -9,6 +9,11 @@ pub use self::error::JoinError; mod harness; use self::harness::Harness; +cfg_rt_multi_thread! { + mod inject; + pub(super) use self::inject::Inject; +} + mod join; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::join::JoinHandle; @@ -134,10 +139,6 @@ cfg_rt_multi_thread! { pub(crate) unsafe fn from_raw(ptr: NonNull
) -> Notified { Notified(Task::from_raw(ptr)) } - - pub(crate) fn header(&self) -> &Header { - self.0.header() - } } impl Task { diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index 34da7fd66a3..977c9159b26 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -1,5 +1,5 @@ use crate::runtime::queue; -use crate::runtime::task::{self, Schedule, Task}; +use crate::runtime::task::{self, Inject, Schedule, Task}; use loom::thread; @@ -7,7 +7,7 @@ use loom::thread; fn basic() { loom::model(|| { let (steal, mut local) = queue::local(); - let inject = queue::Inject::new(); + let inject = Inject::new(); let th = thread::spawn(move || { let (_, mut local) = queue::local(); @@ -61,7 +61,7 @@ fn basic() { fn steal_overflow() { loom::model(|| { let (steal, mut local) = queue::local(); - let inject = queue::Inject::new(); + let inject = Inject::new(); let th = thread::spawn(move || { let (_, mut local) = queue::local(); @@ -129,7 +129,7 @@ fn multi_stealer() { loom::model(|| { let (steal, mut local) = queue::local(); - let inject = queue::Inject::new(); + let inject = Inject::new(); // Push work for _ in 0..NUM_TASKS { @@ -166,7 +166,7 @@ fn chained_steal() { loom::model(|| { let (s1, mut l1) = queue::local(); let (s2, mut l2) = queue::local(); - let inject = queue::Inject::new(); + let inject = Inject::new(); // Load up some tasks for _ in 0..4 { diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index b2962f154e0..e08dc6d99e6 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -1,5 +1,5 @@ use crate::runtime::queue; -use crate::runtime::task::{self, Schedule, Task}; +use crate::runtime::task::{self, Inject, Schedule, Task}; use std::thread; use std::time::Duration; @@ -7,7 +7,7 @@ use std::time::Duration; #[test] fn fits_256() { let (_, mut local) = queue::local(); - let inject = queue::Inject::new(); + let inject = Inject::new(); for _ in 0..256 { let (task, _) = super::joinable::<_, Runtime>(async {}); @@ -22,7 +22,7 @@ fn fits_256() { #[test] fn overflow() { let (_, mut local) = queue::local(); - let inject = queue::Inject::new(); + let inject = Inject::new(); for _ in 0..257 { let (task, _) = super::joinable::<_, Runtime>(async {}); @@ -46,7 +46,7 @@ fn overflow() { fn steal_batch() { let (steal1, mut local1) = queue::local(); let (_, mut local2) = queue::local(); - let inject = queue::Inject::new(); + let inject = Inject::new(); for _ in 0..4 { let (task, _) = super::joinable::<_, Runtime>(async {}); @@ -78,7 +78,7 @@ fn stress1() { for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); - let inject = queue::Inject::new(); + let inject = Inject::new(); let th = thread::spawn(move || { let (_, mut local) = queue::local(); @@ -134,7 +134,7 @@ fn stress2() { for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); - let inject = queue::Inject::new(); + let inject = Inject::new(); let th = thread::spawn(move || { let (_, mut local) = queue::local(); diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 4ae0f5a2592..e91e2ea4b34 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -11,7 +11,7 @@ use crate::park::{Park, Unpark}; use crate::runtime; use crate::runtime::enter::EnterContext; use crate::runtime::park::{Parker, Unparker}; -use crate::runtime::task::OwnedTasks; +use crate::runtime::task::{Inject, OwnedTasks}; use crate::runtime::thread_pool::{AtomicCell, Idle}; use crate::runtime::{queue, task}; use crate::util::FastRand; @@ -70,7 +70,7 @@ pub(super) struct Shared { remotes: Box<[Remote]>, /// Submit work to the scheduler while **not** currently on a worker thread. - inject: queue::Inject>, + inject: Inject>, /// Coordinates idle workers idle: Idle, @@ -147,7 +147,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc, Launch) { let shared = Arc::new(Shared { remotes: remotes.into_boxed_slice(), - inject: queue::Inject::new(), + inject: Inject::new(), idle: Idle::new(size), owned: OwnedTasks::new(), shutdown_cores: Mutex::new(vec![]), @@ -563,7 +563,7 @@ impl Core { impl Worker { /// Returns a reference to the scheduler's injection queue - fn inject(&self) -> &queue::Inject> { + fn inject(&self) -> &Inject> { &self.shared.inject } }