From 36d6d2fec988569b8491b012c24998b940739d75 Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Mon, 9 May 2022 15:56:42 -0700 Subject: [PATCH] Make global queue and event polling intervals configurable Adds knobs to the runtime builder to control the number of ticks between polling the global task queue (fairness) and the event driver (I/O prioritization). Both varieties of scheduler already supported these intervals, but they were defined by private constants. Some workloads benefit from customizing these values. Closes #4651 --- tokio/src/runtime/basic_scheduler.rs | 23 ++--- tokio/src/runtime/builder.rs | 111 ++++++++++++++++++++++-- tokio/src/runtime/thread_pool/mod.rs | 13 ++- tokio/src/runtime/thread_pool/worker.rs | 20 +++-- 4 files changed, 137 insertions(+), 30 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index cb48e98ef22..906e6129cd6 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -57,6 +57,12 @@ struct Core { /// Metrics batch metrics: MetricsBatch, + + /// How many ticks before pulling a task from the global/remote queue? + global_queue_interval: u8, + + /// How many ticks before yielding to the driver for timer and I/O events? + event_interval: u8, } #[derive(Clone)] @@ -107,15 +113,6 @@ struct Context { /// Initial queue capacity. const INITIAL_CAPACITY: usize = 64; -/// Max number of tasks to poll per tick. -#[cfg(loom)] -const MAX_TASKS_PER_TICK: usize = 4; -#[cfg(not(loom))] -const MAX_TASKS_PER_TICK: usize = 61; - -/// How often to check the remote queue first. -const REMOTE_FIRST_INTERVAL: u8 = 31; - // Tracks the current BasicScheduler. scoped_thread_local!(static CURRENT: Context); @@ -125,6 +122,8 @@ impl BasicScheduler { handle_inner: HandleInner, before_park: Option, after_unpark: Option, + global_queue_interval: u8, + event_interval: u8, ) -> BasicScheduler { let unpark = driver.unpark(); @@ -148,6 +147,8 @@ impl BasicScheduler { tick: 0, driver: Some(driver), metrics: MetricsBatch::new(), + global_queue_interval, + event_interval, }))); BasicScheduler { @@ -514,12 +515,12 @@ impl CoreGuard<'_> { } } - for _ in 0..MAX_TASKS_PER_TICK { + for _ in 0..core.event_interval { // Get and increment the current tick let tick = core.tick; core.tick = core.tick.wrapping_add(1); - let entry = if tick % REMOTE_FIRST_INTERVAL == 0 { + let entry = if tick % core.global_queue_interval == 0 { core.spawner.pop().or_else(|| core.tasks.pop_front()) } else { core.tasks.pop_front().or_else(|| core.spawner.pop()) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 618474c05ce..38b712aec55 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -78,6 +78,12 @@ pub struct Builder { /// Customizable keep alive timeout for BlockingPool pub(super) keep_alive: Option, + + /// How many ticks before pulling a task from the global/remote queue? + pub(super) global_queue_interval: u8, + + /// How many ticks before yielding to the driver for timer and I/O events? + pub(super) event_interval: u8, } pub(crate) type ThreadNameFn = std::sync::Arc String + Send + Sync + 'static>; @@ -98,7 +104,20 @@ impl Builder { /// /// [`LocalSet`]: crate::task::LocalSet pub fn new_current_thread() -> Builder { - Builder::new(Kind::CurrentThread) + /// How often to check the remote queue first. + const REMOTE_FIRST_INTERVAL: u8 = 31; + + /// Max number of tasks to poll per tick. + #[cfg(loom)] + const MAX_TASKS_PER_TICK: u8 = 4; + #[cfg(not(loom))] + const MAX_TASKS_PER_TICK: u8 = 61; + + Builder::new( + Kind::CurrentThread, + REMOTE_FIRST_INTERVAL, + MAX_TASKS_PER_TICK, + ) } /// Returns a new builder with the multi thread scheduler selected. @@ -107,14 +126,26 @@ impl Builder { #[cfg(feature = "rt-multi-thread")] #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] pub fn new_multi_thread() -> Builder { - Builder::new(Kind::MultiThread) + /// After how many ticks is the global queue polled. This helps to ensure + /// fairness. + /// + /// The same value is used to control when to yield to the driver for events. + /// + /// The number is fairly arbitrary. I believe this value was copied from golang. + const GLOBAL_POLL_INTERVAL: u8 = 61; + + Builder::new( + Kind::MultiThread, + GLOBAL_POLL_INTERVAL, + GLOBAL_POLL_INTERVAL, + ) } /// Returns a new runtime builder initialized with default configuration /// values. /// /// Configuration methods can be chained on the return value. - pub(crate) fn new(kind: Kind) -> Builder { + pub(crate) fn new(kind: Kind, global_queue_interval: u8, event_interval: u8) -> Builder { Builder { kind, @@ -145,6 +176,11 @@ impl Builder { after_unpark: None, keep_alive: None, + + // Defaults for these values depend on the scheduler kind, so we get them + // as parameters. + global_queue_interval, + event_interval, } } @@ -554,6 +590,54 @@ impl Builder { self } + /// Sets the number of scheduler ticks after which the scheduler will poll the global + /// task queue. + /// + /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. Schedulers + /// have a local queue of already-claimed tasks, and a global queue of incoming tasks. + /// Setting the interval to a smaller value increases the fairness of the scheduler, + /// at the cost of more synchronization overhead. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new_multi_thread() + /// .global_queue_interval(31) + /// .build(); + /// # } + /// ``` + pub fn global_queue_interval(&mut self, val: u8) -> &mut Self { + self.global_queue_interval = val; + self + } + + /// Sets the number of scheduler ticks after which the scheduler will poll for + /// external events (timers, I/O, and so on). + /// + /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. + /// Setting the event interval determines the effective "priority" of delivering + /// these external events (which may wake up additional tasks), compared to + /// executing tasks that are currently ready to run. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new_multi_thread() + /// .event_interval(31) + /// .build(); + /// # } + /// ``` + pub fn event_interval(&mut self, val: u8) -> &mut Self { + self.event_interval = val; + self + } + fn build_basic_runtime(&mut self) -> io::Result { use crate::runtime::{BasicScheduler, HandleInner, Kind}; @@ -580,6 +664,8 @@ impl Builder { handle_inner, self.before_park.clone(), self.after_unpark.clone(), + self.global_queue_interval, + self.event_interval, ); let spawner = Spawner::Basic(scheduler.spawner().clone()); @@ -667,14 +753,15 @@ cfg_rt_multi_thread! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result { use crate::loom::sys::num_cpus; - use crate::runtime::{Kind, HandleInner, ThreadPool}; + use crate::runtime::{HandleInner, Kind, ThreadPool}; let core_threads = self.worker_threads.unwrap_or_else(num_cpus); let (driver, resources) = driver::Driver::new(self.get_cfg())?; // Create the blocking pool - let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); + let blocking_pool = + blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); let blocking_spawner = blocking_pool.spawner().clone(); let handle_inner = HandleInner { @@ -685,13 +772,19 @@ cfg_rt_multi_thread! { blocking_spawner, }; - let (scheduler, launch) = ThreadPool::new(core_threads, driver, handle_inner, self.before_park.clone(), self.after_unpark.clone()); + let (scheduler, launch) = ThreadPool::new( + core_threads, + driver, + handle_inner, + self.before_park.clone(), + self.after_unpark.clone(), + self.global_queue_interval, + self.event_interval, + ); let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); // Create the runtime handle - let handle = Handle { - spawner, - }; + let handle = Handle { spawner }; // Spawn the thread pool workers let _enter = crate::runtime::context::enter(handle.clone()); diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index ef6b5775ca2..5e3fbbc1648 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -51,10 +51,19 @@ impl ThreadPool { handle_inner: HandleInner, before_park: Option, after_unpark: Option, + global_queue_interval: u8, + event_interval: u8, ) -> (ThreadPool, Launch) { let parker = Parker::new(driver); - let (shared, launch) = - worker::create(size, parker, handle_inner, before_park, after_unpark); + let (shared, launch) = worker::create( + size, + parker, + handle_inner, + before_park, + after_unpark, + global_queue_interval, + event_interval, + ); let spawner = Spawner { shared }; let thread_pool = ThreadPool { spawner }; diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 3d58767f308..9ce5aebbc2a 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -117,6 +117,12 @@ struct Core { /// Fast random number generator. rand: FastRand, + + /// How many ticks before pulling a task from the global/remote queue? + global_queue_interval: u8, + + /// How many ticks before yielding to the driver for timer and I/O events? + event_interval: u8, } /// State shared across all workers @@ -198,6 +204,8 @@ pub(super) fn create( handle_inner: HandleInner, before_park: Option, after_unpark: Option, + global_queue_interval: u8, + event_interval: u8, ) -> (Arc, Launch) { let mut cores = Vec::with_capacity(size); let mut remotes = Vec::with_capacity(size); @@ -219,6 +227,8 @@ pub(super) fn create( park: Some(park), metrics: MetricsBatch::new(), rand: FastRand::new(seed()), + global_queue_interval, + event_interval, })); remotes.push(Remote { steal, unpark }); @@ -346,12 +356,6 @@ where } } -/// After how many ticks is the global queue polled. This helps to ensure -/// fairness. -/// -/// The number is fairly arbitrary. I believe this value was copied from golang. -const GLOBAL_POLL_INTERVAL: u8 = 61; - impl Launch { pub(crate) fn launch(mut self) { for worker in self.0.drain(..) { @@ -464,7 +468,7 @@ impl Context { } fn maintenance(&self, mut core: Box) -> Box { - if core.tick % GLOBAL_POLL_INTERVAL == 0 { + if core.tick % core.event_interval == 0 { // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... // to run without actually putting the thread to sleep. core = self.park_timeout(core, Some(Duration::from_millis(0))); @@ -551,7 +555,7 @@ impl Core { /// Return the next notified task available to this worker. fn next_task(&mut self, worker: &Worker) -> Option { - if self.tick % GLOBAL_POLL_INTERVAL == 0 { + if self.tick % self.global_queue_interval == 0 { worker.inject().pop().or_else(|| self.next_local_task()) } else { self.next_local_task().or_else(|| worker.inject().pop())