diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 2f0f8d3f2e9..c9820b758d5 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -57,12 +57,6 @@ struct Core { /// Metrics batch metrics: MetricsBatch, - - /// How many ticks before pulling a task from the global/remote queue? - global_queue_interval: u32, - - /// How many ticks before yielding to the driver for timer and I/O events? - event_interval: u32, } #[derive(Clone)] @@ -70,6 +64,20 @@ pub(crate) struct Spawner { shared: Arc, } +pub(crate) struct Config { + /// How many ticks before pulling a task from the global/remote queue? + pub(crate) global_queue_interval: u32, + + /// How many ticks before yielding to the driver for timer and I/O events? + pub(crate) event_interval: u32, + + /// Callback for a worker parking itself + pub(crate) before_park: Option, + + /// Callback for a worker unparking itself + pub(crate) after_unpark: Option, +} + /// Scheduler state shared between threads. struct Shared { /// Remote run queue. None if the `Runtime` has been dropped. @@ -87,11 +95,8 @@ struct Shared { /// Handle to I/O driver, timer, blocking pool, ... handle_inner: HandleInner, - /// Callback for a worker parking itself - before_park: Option, - - /// Callback for a worker unparking itself - after_unpark: Option, + /// Scheduler configuration options + config: Config, /// Keeps track of various runtime metrics. scheduler_metrics: SchedulerMetrics, @@ -117,14 +122,7 @@ const INITIAL_CAPACITY: usize = 64; scoped_thread_local!(static CURRENT: Context); impl BasicScheduler { - pub(crate) fn new( - driver: Driver, - handle_inner: HandleInner, - before_park: Option, - after_unpark: Option, - global_queue_interval: u32, - event_interval: u32, - ) -> BasicScheduler { + pub(crate) fn new(driver: Driver, handle_inner: HandleInner, config: Config) -> BasicScheduler { let unpark = driver.unpark(); let spawner = Spawner { @@ -134,8 +132,7 @@ impl BasicScheduler { unpark, woken: AtomicBool::new(false), handle_inner, - before_park, - after_unpark, + config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics: WorkerMetrics::new(), }), @@ -147,8 +144,6 @@ impl BasicScheduler { tick: 0, driver: Some(driver), metrics: MetricsBatch::new(), - global_queue_interval, - event_interval, }))); BasicScheduler { @@ -302,7 +297,7 @@ impl Context { fn park(&self, mut core: Box) -> Box { let mut driver = core.driver.take().expect("driver missing"); - if let Some(f) = &self.spawner.shared.before_park { + if let Some(f) = &self.spawner.shared.config.before_park { // Incorrect lint, the closures are actually different types so `f` // cannot be passed as an argument to `enter`. #[allow(clippy::redundant_closure)] @@ -325,7 +320,7 @@ impl Context { core.metrics.returned_from_park(); } - if let Some(f) = &self.spawner.shared.after_unpark { + if let Some(f) = &self.spawner.shared.config.after_unpark { // Incorrect lint, the closures are actually different types so `f` // cannot be passed as an argument to `enter`. #[allow(clippy::redundant_closure)] @@ -515,12 +510,12 @@ impl CoreGuard<'_> { } } - for _ in 0..core.event_interval { + for _ in 0..core.spawner.shared.config.event_interval { // Get and increment the current tick let tick = core.tick; core.tick = core.tick.wrapping_add(1); - let entry = if tick % core.global_queue_interval == 0 { + let entry = if tick % core.spawner.shared.config.global_queue_interval == 0 { core.spawner.pop().or_else(|| core.tasks.pop_front()) } else { core.tasks.pop_front().or_else(|| core.spawner.pop()) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 060de489db5..f6d78f736a0 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -632,6 +632,7 @@ impl Builder { } fn build_basic_runtime(&mut self) -> io::Result { + use crate::runtime::basic_scheduler::Config; use crate::runtime::{BasicScheduler, HandleInner, Kind}; let (driver, resources) = driver::Driver::new(self.get_cfg())?; @@ -655,10 +656,12 @@ impl Builder { let scheduler = BasicScheduler::new( driver, handle_inner, - self.before_park.clone(), - self.after_unpark.clone(), - self.global_queue_interval, - self.event_interval, + Config { + before_park: self.before_park.clone(), + after_unpark: self.after_unpark.clone(), + global_queue_interval: self.global_queue_interval, + event_interval: self.event_interval, + }, ); let spawner = Spawner::Basic(scheduler.spawner().clone());