diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index cb48e98ef22..2f0f8d3f2e9 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -48,7 +48,7 @@ struct Core { spawner: Spawner, /// Current tick - tick: u8, + tick: u32, /// Runtime driver /// @@ -57,6 +57,12 @@ struct Core { /// Metrics batch metrics: MetricsBatch, + + /// How many ticks before pulling a task from the global/remote queue? + global_queue_interval: u32, + + /// How many ticks before yielding to the driver for timer and I/O events? + event_interval: u32, } #[derive(Clone)] @@ -107,15 +113,6 @@ struct Context { /// Initial queue capacity. const INITIAL_CAPACITY: usize = 64; -/// Max number of tasks to poll per tick. -#[cfg(loom)] -const MAX_TASKS_PER_TICK: usize = 4; -#[cfg(not(loom))] -const MAX_TASKS_PER_TICK: usize = 61; - -/// How often to check the remote queue first. -const REMOTE_FIRST_INTERVAL: u8 = 31; - // Tracks the current BasicScheduler. scoped_thread_local!(static CURRENT: Context); @@ -125,6 +122,8 @@ impl BasicScheduler { handle_inner: HandleInner, before_park: Option, after_unpark: Option, + global_queue_interval: u32, + event_interval: u32, ) -> BasicScheduler { let unpark = driver.unpark(); @@ -148,6 +147,8 @@ impl BasicScheduler { tick: 0, driver: Some(driver), metrics: MetricsBatch::new(), + global_queue_interval, + event_interval, }))); BasicScheduler { @@ -514,12 +515,12 @@ impl CoreGuard<'_> { } } - for _ in 0..MAX_TASKS_PER_TICK { + for _ in 0..core.event_interval { // Get and increment the current tick let tick = core.tick; core.tick = core.tick.wrapping_add(1); - let entry = if tick % REMOTE_FIRST_INTERVAL == 0 { + let entry = if tick % core.global_queue_interval == 0 { core.spawner.pop().or_else(|| core.tasks.pop_front()) } else { core.tasks.pop_front().or_else(|| core.spawner.pop()) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 618474c05ce..060de489db5 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -78,6 +78,12 @@ pub struct Builder { /// Customizable keep alive timeout for BlockingPool pub(super) keep_alive: Option, + + /// How many ticks before pulling a task from the global/remote queue? + pub(super) global_queue_interval: u32, + + /// How many ticks before yielding to the driver for timer and I/O events? + pub(super) event_interval: u32, } pub(crate) type ThreadNameFn = std::sync::Arc String + Send + Sync + 'static>; @@ -98,7 +104,13 @@ impl Builder { /// /// [`LocalSet`]: crate::task::LocalSet pub fn new_current_thread() -> Builder { - Builder::new(Kind::CurrentThread) + #[cfg(loom)] + const EVENT_INTERVAL: u32 = 4; + // The number `61` is fairly arbitrary. I believe this value was copied from golang. + #[cfg(not(loom))] + const EVENT_INTERVAL: u32 = 61; + + Builder::new(Kind::CurrentThread, 31, EVENT_INTERVAL) } /// Returns a new builder with the multi thread scheduler selected. @@ -107,14 +119,15 @@ impl Builder { #[cfg(feature = "rt-multi-thread")] #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] pub fn new_multi_thread() -> Builder { - Builder::new(Kind::MultiThread) + // The number `61` is fairly arbitrary. I believe this value was copied from golang. + Builder::new(Kind::MultiThread, 61, 61) } /// Returns a new runtime builder initialized with default configuration /// values. /// /// Configuration methods can be chained on the return value. - pub(crate) fn new(kind: Kind) -> Builder { + pub(crate) fn new(kind: Kind, global_queue_interval: u32, event_interval: u32) -> Builder { Builder { kind, @@ -145,6 +158,11 @@ impl Builder { after_unpark: None, keep_alive: None, + + // Defaults for these values depend on the scheduler kind, so we get them + // as parameters. + global_queue_interval, + event_interval, } } @@ -286,7 +304,6 @@ impl Builder { /// ``` /// # use tokio::runtime; /// # use std::sync::atomic::{AtomicUsize, Ordering}; - /// /// # pub fn main() { /// let rt = runtime::Builder::new_multi_thread() /// .thread_name_fn(|| { @@ -338,7 +355,6 @@ impl Builder { /// /// ``` /// # use tokio::runtime; - /// /// # pub fn main() { /// let runtime = runtime::Builder::new_multi_thread() /// .on_thread_start(|| { @@ -364,7 +380,6 @@ impl Builder { /// /// ``` /// # use tokio::runtime; - /// /// # pub fn main() { /// let runtime = runtime::Builder::new_multi_thread() /// .on_thread_stop(|| { @@ -473,7 +488,6 @@ impl Builder { /// /// ``` /// # use tokio::runtime; - /// /// # pub fn main() { /// let runtime = runtime::Builder::new_multi_thread() /// .on_thread_unpark(|| { @@ -542,7 +556,6 @@ impl Builder { /// ``` /// # use tokio::runtime; /// # use std::time::Duration; - /// /// # pub fn main() { /// let rt = runtime::Builder::new_multi_thread() /// .thread_keep_alive(Duration::from_millis(100)) @@ -554,6 +567,70 @@ impl Builder { self } + /// Sets the number of scheduler ticks after which the scheduler will poll the global + /// task queue. + /// + /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. + /// + /// By default the global queue interval is: + /// + /// * `31` for the current-thread scheduler. + /// * `61` for the multithreaded scheduler. + /// + /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming + /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler, + /// at the cost of more synchronization overhead. That can be beneficial for prioritizing + /// getting started on new work, especially if tasks frequently yield rather than complete + /// or await on further I/O. Conversely, a higher value prioritizes existing work, and + /// is a good choice when most tasks quickly complete polling. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime; + /// # pub fn main() { + /// let rt = runtime::Builder::new_multi_thread() + /// .global_queue_interval(31) + /// .build(); + /// # } + /// ``` + pub fn global_queue_interval(&mut self, val: u32) -> &mut Self { + self.global_queue_interval = val; + self + } + + /// Sets the number of scheduler ticks after which the scheduler will poll for + /// external events (timers, I/O, and so on). + /// + /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. + /// + /// By default, the event interval is `61` for all scheduler types. + /// + /// Setting the event interval determines the effective "priority" of delivering + /// these external events (which may wake up additional tasks), compared to + /// executing tasks that are currently ready to run. A smaller value is useful + /// when tasks frequently spend a long time in polling, or frequently yield, + /// which can result in overly long delays picking up I/O events. Conversely, + /// picking up new events requires extra synchronization and syscall overhead, + /// so if tasks generally complete their polling quickly, a higher event interval + /// will minimize that overhead while still keeping the scheduler responsive to + /// events. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime; + /// # pub fn main() { + /// let rt = runtime::Builder::new_multi_thread() + /// .event_interval(31) + /// .build(); + /// # } + /// ``` + pub fn event_interval(&mut self, val: u32) -> &mut Self { + self.event_interval = val; + self + } + fn build_basic_runtime(&mut self) -> io::Result { use crate::runtime::{BasicScheduler, HandleInner, Kind}; @@ -580,6 +657,8 @@ impl Builder { handle_inner, self.before_park.clone(), self.after_unpark.clone(), + self.global_queue_interval, + self.event_interval, ); let spawner = Spawner::Basic(scheduler.spawner().clone()); @@ -667,14 +746,15 @@ cfg_rt_multi_thread! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result { use crate::loom::sys::num_cpus; - use crate::runtime::{Kind, HandleInner, ThreadPool}; + use crate::runtime::{HandleInner, Kind, ThreadPool}; let core_threads = self.worker_threads.unwrap_or_else(num_cpus); let (driver, resources) = driver::Driver::new(self.get_cfg())?; // Create the blocking pool - let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); + let blocking_pool = + blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); let blocking_spawner = blocking_pool.spawner().clone(); let handle_inner = HandleInner { @@ -685,13 +765,19 @@ cfg_rt_multi_thread! { blocking_spawner, }; - let (scheduler, launch) = ThreadPool::new(core_threads, driver, handle_inner, self.before_park.clone(), self.after_unpark.clone()); + let (scheduler, launch) = ThreadPool::new( + core_threads, + driver, + handle_inner, + self.before_park.clone(), + self.after_unpark.clone(), + self.global_queue_interval, + self.event_interval, + ); let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); // Create the runtime handle - let handle = Handle { - spawner, - }; + let handle = Handle { spawner }; // Spawn the thread pool workers let _enter = crate::runtime::context::enter(handle.clone()); diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index ef6b5775ca2..5ac71e139d9 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -51,10 +51,19 @@ impl ThreadPool { handle_inner: HandleInner, before_park: Option, after_unpark: Option, + global_queue_interval: u32, + event_interval: u32, ) -> (ThreadPool, Launch) { let parker = Parker::new(driver); - let (shared, launch) = - worker::create(size, parker, handle_inner, before_park, after_unpark); + let (shared, launch) = worker::create( + size, + parker, + handle_inner, + before_park, + after_unpark, + global_queue_interval, + event_interval, + ); let spawner = Spawner { shared }; let thread_pool = ThreadPool { spawner }; diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 3d58767f308..b01c5bc4749 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -87,7 +87,7 @@ pub(super) struct Worker { /// Core data struct Core { /// Used to schedule bookkeeping tasks every so often. - tick: u8, + tick: u32, /// When a task is scheduled from a worker, it is stored in this slot. The /// worker will check this slot for a task **before** checking the run @@ -117,6 +117,12 @@ struct Core { /// Fast random number generator. rand: FastRand, + + /// How many ticks before pulling a task from the global/remote queue? + global_queue_interval: u32, + + /// How many ticks before yielding to the driver for timer and I/O events? + event_interval: u32, } /// State shared across all workers @@ -198,6 +204,8 @@ pub(super) fn create( handle_inner: HandleInner, before_park: Option, after_unpark: Option, + global_queue_interval: u32, + event_interval: u32, ) -> (Arc, Launch) { let mut cores = Vec::with_capacity(size); let mut remotes = Vec::with_capacity(size); @@ -219,6 +227,8 @@ pub(super) fn create( park: Some(park), metrics: MetricsBatch::new(), rand: FastRand::new(seed()), + global_queue_interval, + event_interval, })); remotes.push(Remote { steal, unpark }); @@ -346,12 +356,6 @@ where } } -/// After how many ticks is the global queue polled. This helps to ensure -/// fairness. -/// -/// The number is fairly arbitrary. I believe this value was copied from golang. -const GLOBAL_POLL_INTERVAL: u8 = 61; - impl Launch { pub(crate) fn launch(mut self) { for worker in self.0.drain(..) { @@ -464,7 +468,7 @@ impl Context { } fn maintenance(&self, mut core: Box) -> Box { - if core.tick % GLOBAL_POLL_INTERVAL == 0 { + if core.tick % core.event_interval == 0 { // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... // to run without actually putting the thread to sleep. core = self.park_timeout(core, Some(Duration::from_millis(0))); @@ -551,7 +555,7 @@ impl Core { /// Return the next notified task available to this worker. fn next_task(&mut self, worker: &Worker) -> Option { - if self.tick % GLOBAL_POLL_INTERVAL == 0 { + if self.tick % self.global_queue_interval == 0 { worker.inject().pop().or_else(|| self.next_local_task()) } else { self.next_local_task().or_else(|| worker.inject().pop())