diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index cb48e98ef22..906e6129cd6 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -57,6 +57,12 @@ struct Core { /// Metrics batch metrics: MetricsBatch, + + /// How many ticks before pulling a task from the global/remote queue? + global_queue_interval: u8, + + /// How many ticks before yielding to the driver for timer and I/O events? + event_interval: u8, } #[derive(Clone)] @@ -107,15 +113,6 @@ struct Context { /// Initial queue capacity. const INITIAL_CAPACITY: usize = 64; -/// Max number of tasks to poll per tick. -#[cfg(loom)] -const MAX_TASKS_PER_TICK: usize = 4; -#[cfg(not(loom))] -const MAX_TASKS_PER_TICK: usize = 61; - -/// How often to check the remote queue first. -const REMOTE_FIRST_INTERVAL: u8 = 31; - // Tracks the current BasicScheduler. scoped_thread_local!(static CURRENT: Context); @@ -125,6 +122,8 @@ impl BasicScheduler { handle_inner: HandleInner, before_park: Option, after_unpark: Option, + global_queue_interval: u8, + event_interval: u8, ) -> BasicScheduler { let unpark = driver.unpark(); @@ -148,6 +147,8 @@ impl BasicScheduler { tick: 0, driver: Some(driver), metrics: MetricsBatch::new(), + global_queue_interval, + event_interval, }))); BasicScheduler { @@ -514,12 +515,12 @@ impl CoreGuard<'_> { } } - for _ in 0..MAX_TASKS_PER_TICK { + for _ in 0..core.event_interval { // Get and increment the current tick let tick = core.tick; core.tick = core.tick.wrapping_add(1); - let entry = if tick % REMOTE_FIRST_INTERVAL == 0 { + let entry = if tick % core.global_queue_interval == 0 { core.spawner.pop().or_else(|| core.tasks.pop_front()) } else { core.tasks.pop_front().or_else(|| core.spawner.pop()) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 618474c05ce..38b712aec55 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -78,6 +78,12 @@ pub struct Builder { /// Customizable keep alive timeout for BlockingPool pub(super) keep_alive: Option, + + /// How many ticks before pulling a task from the global/remote queue? + pub(super) global_queue_interval: u8, + + /// How many ticks before yielding to the driver for timer and I/O events? + pub(super) event_interval: u8, } pub(crate) type ThreadNameFn = std::sync::Arc String + Send + Sync + 'static>; @@ -98,7 +104,20 @@ impl Builder { /// /// [`LocalSet`]: crate::task::LocalSet pub fn new_current_thread() -> Builder { - Builder::new(Kind::CurrentThread) + /// How often to check the remote queue first. + const REMOTE_FIRST_INTERVAL: u8 = 31; + + /// Max number of tasks to poll per tick. + #[cfg(loom)] + const MAX_TASKS_PER_TICK: u8 = 4; + #[cfg(not(loom))] + const MAX_TASKS_PER_TICK: u8 = 61; + + Builder::new( + Kind::CurrentThread, + REMOTE_FIRST_INTERVAL, + MAX_TASKS_PER_TICK, + ) } /// Returns a new builder with the multi thread scheduler selected. @@ -107,14 +126,26 @@ impl Builder { #[cfg(feature = "rt-multi-thread")] #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] pub fn new_multi_thread() -> Builder { - Builder::new(Kind::MultiThread) + /// After how many ticks is the global queue polled. This helps to ensure + /// fairness. + /// + /// The same value is used to control when to yield to the driver for events. + /// + /// The number is fairly arbitrary. I believe this value was copied from golang. + const GLOBAL_POLL_INTERVAL: u8 = 61; + + Builder::new( + Kind::MultiThread, + GLOBAL_POLL_INTERVAL, + GLOBAL_POLL_INTERVAL, + ) } /// Returns a new runtime builder initialized with default configuration /// values. /// /// Configuration methods can be chained on the return value. - pub(crate) fn new(kind: Kind) -> Builder { + pub(crate) fn new(kind: Kind, global_queue_interval: u8, event_interval: u8) -> Builder { Builder { kind, @@ -145,6 +176,11 @@ impl Builder { after_unpark: None, keep_alive: None, + + // Defaults for these values depend on the scheduler kind, so we get them + // as parameters. + global_queue_interval, + event_interval, } } @@ -554,6 +590,54 @@ impl Builder { self } + /// Sets the number of scheduler ticks after which the scheduler will poll the global + /// task queue. + /// + /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. Schedulers + /// have a local queue of already-claimed tasks, and a global queue of incoming tasks. + /// Setting the interval to a smaller value increases the fairness of the scheduler, + /// at the cost of more synchronization overhead. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new_multi_thread() + /// .global_queue_interval(31) + /// .build(); + /// # } + /// ``` + pub fn global_queue_interval(&mut self, val: u8) -> &mut Self { + self.global_queue_interval = val; + self + } + + /// Sets the number of scheduler ticks after which the scheduler will poll for + /// external events (timers, I/O, and so on). + /// + /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. + /// Setting the event interval determines the effective "priority" of delivering + /// these external events (which may wake up additional tasks), compared to + /// executing tasks that are currently ready to run. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new_multi_thread() + /// .event_interval(31) + /// .build(); + /// # } + /// ``` + pub fn event_interval(&mut self, val: u8) -> &mut Self { + self.event_interval = val; + self + } + fn build_basic_runtime(&mut self) -> io::Result { use crate::runtime::{BasicScheduler, HandleInner, Kind}; @@ -580,6 +664,8 @@ impl Builder { handle_inner, self.before_park.clone(), self.after_unpark.clone(), + self.global_queue_interval, + self.event_interval, ); let spawner = Spawner::Basic(scheduler.spawner().clone()); @@ -667,14 +753,15 @@ cfg_rt_multi_thread! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result { use crate::loom::sys::num_cpus; - use crate::runtime::{Kind, HandleInner, ThreadPool}; + use crate::runtime::{HandleInner, Kind, ThreadPool}; let core_threads = self.worker_threads.unwrap_or_else(num_cpus); let (driver, resources) = driver::Driver::new(self.get_cfg())?; // Create the blocking pool - let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); + let blocking_pool = + blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); let blocking_spawner = blocking_pool.spawner().clone(); let handle_inner = HandleInner { @@ -685,13 +772,19 @@ cfg_rt_multi_thread! { blocking_spawner, }; - let (scheduler, launch) = ThreadPool::new(core_threads, driver, handle_inner, self.before_park.clone(), self.after_unpark.clone()); + let (scheduler, launch) = ThreadPool::new( + core_threads, + driver, + handle_inner, + self.before_park.clone(), + self.after_unpark.clone(), + self.global_queue_interval, + self.event_interval, + ); let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); // Create the runtime handle - let handle = Handle { - spawner, - }; + let handle = Handle { spawner }; // Spawn the thread pool workers let _enter = crate::runtime::context::enter(handle.clone()); diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index ef6b5775ca2..5e3fbbc1648 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -51,10 +51,19 @@ impl ThreadPool { handle_inner: HandleInner, before_park: Option, after_unpark: Option, + global_queue_interval: u8, + event_interval: u8, ) -> (ThreadPool, Launch) { let parker = Parker::new(driver); - let (shared, launch) = - worker::create(size, parker, handle_inner, before_park, after_unpark); + let (shared, launch) = worker::create( + size, + parker, + handle_inner, + before_park, + after_unpark, + global_queue_interval, + event_interval, + ); let spawner = Spawner { shared }; let thread_pool = ThreadPool { spawner }; diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 3d58767f308..9ce5aebbc2a 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -117,6 +117,12 @@ struct Core { /// Fast random number generator. rand: FastRand, + + /// How many ticks before pulling a task from the global/remote queue? + global_queue_interval: u8, + + /// How many ticks before yielding to the driver for timer and I/O events? + event_interval: u8, } /// State shared across all workers @@ -198,6 +204,8 @@ pub(super) fn create( handle_inner: HandleInner, before_park: Option, after_unpark: Option, + global_queue_interval: u8, + event_interval: u8, ) -> (Arc, Launch) { let mut cores = Vec::with_capacity(size); let mut remotes = Vec::with_capacity(size); @@ -219,6 +227,8 @@ pub(super) fn create( park: Some(park), metrics: MetricsBatch::new(), rand: FastRand::new(seed()), + global_queue_interval, + event_interval, })); remotes.push(Remote { steal, unpark }); @@ -346,12 +356,6 @@ where } } -/// After how many ticks is the global queue polled. This helps to ensure -/// fairness. -/// -/// The number is fairly arbitrary. I believe this value was copied from golang. -const GLOBAL_POLL_INTERVAL: u8 = 61; - impl Launch { pub(crate) fn launch(mut self) { for worker in self.0.drain(..) { @@ -464,7 +468,7 @@ impl Context { } fn maintenance(&self, mut core: Box) -> Box { - if core.tick % GLOBAL_POLL_INTERVAL == 0 { + if core.tick % core.event_interval == 0 { // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... // to run without actually putting the thread to sleep. core = self.park_timeout(core, Some(Duration::from_millis(0))); @@ -551,7 +555,7 @@ impl Core { /// Return the next notified task available to this worker. fn next_task(&mut self, worker: &Worker) -> Option { - if self.tick % GLOBAL_POLL_INTERVAL == 0 { + if self.tick % self.global_queue_interval == 0 { worker.inject().pop().or_else(|| self.next_local_task()) } else { self.next_local_task().or_else(|| worker.inject().pop())