Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: clean up arguments passed to basic scheduler #4767

Merged
merged 1 commit into from Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 22 additions & 27 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -57,19 +57,27 @@ struct Core {

/// Metrics batch
metrics: MetricsBatch,

/// How many ticks before pulling a task from the global/remote queue?
global_queue_interval: u32,

/// How many ticks before yielding to the driver for timer and I/O events?
event_interval: u32,
}

#[derive(Clone)]
pub(crate) struct Spawner {
shared: Arc<Shared>,
}

pub(crate) struct Config {
/// How many ticks before pulling a task from the global/remote queue?
pub(crate) global_queue_interval: u32,

/// How many ticks before yielding to the driver for timer and I/O events?
pub(crate) event_interval: u32,

/// Callback for a worker parking itself
pub(crate) before_park: Option<Callback>,

/// Callback for a worker unparking itself
pub(crate) after_unpark: Option<Callback>,
}

/// Scheduler state shared between threads.
struct Shared {
/// Remote run queue. None if the `Runtime` has been dropped.
Expand All @@ -87,11 +95,8 @@ struct Shared {
/// Handle to I/O driver, timer, blocking pool, ...
handle_inner: HandleInner,

/// Callback for a worker parking itself
before_park: Option<Callback>,

/// Callback for a worker unparking itself
after_unpark: Option<Callback>,
/// Scheduler configuration options
config: Config,

/// Keeps track of various runtime metrics.
scheduler_metrics: SchedulerMetrics,
Expand All @@ -117,14 +122,7 @@ const INITIAL_CAPACITY: usize = 64;
scoped_thread_local!(static CURRENT: Context);

impl BasicScheduler {
pub(crate) fn new(
driver: Driver,
handle_inner: HandleInner,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
global_queue_interval: u32,
event_interval: u32,
) -> BasicScheduler {
pub(crate) fn new(driver: Driver, handle_inner: HandleInner, config: Config) -> BasicScheduler {
let unpark = driver.unpark();

let spawner = Spawner {
Expand All @@ -134,8 +132,7 @@ impl BasicScheduler {
unpark,
woken: AtomicBool::new(false),
handle_inner,
before_park,
after_unpark,
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: WorkerMetrics::new(),
}),
Expand All @@ -147,8 +144,6 @@ impl BasicScheduler {
tick: 0,
driver: Some(driver),
metrics: MetricsBatch::new(),
global_queue_interval,
event_interval,
})));

BasicScheduler {
Expand Down Expand Up @@ -302,7 +297,7 @@ impl Context {
fn park(&self, mut core: Box<Core>) -> Box<Core> {
let mut driver = core.driver.take().expect("driver missing");

if let Some(f) = &self.spawner.shared.before_park {
if let Some(f) = &self.spawner.shared.config.before_park {
// Incorrect lint, the closures are actually different types so `f`
// cannot be passed as an argument to `enter`.
#[allow(clippy::redundant_closure)]
Expand All @@ -325,7 +320,7 @@ impl Context {
core.metrics.returned_from_park();
}

if let Some(f) = &self.spawner.shared.after_unpark {
if let Some(f) = &self.spawner.shared.config.after_unpark {
// Incorrect lint, the closures are actually different types so `f`
// cannot be passed as an argument to `enter`.
#[allow(clippy::redundant_closure)]
Expand Down Expand Up @@ -515,12 +510,12 @@ impl CoreGuard<'_> {
}
}

for _ in 0..core.event_interval {
for _ in 0..core.spawner.shared.config.event_interval {
// Get and increment the current tick
let tick = core.tick;
core.tick = core.tick.wrapping_add(1);

let entry = if tick % core.global_queue_interval == 0 {
let entry = if tick % core.spawner.shared.config.global_queue_interval == 0 {
core.spawner.pop().or_else(|| core.tasks.pop_front())
} else {
core.tasks.pop_front().or_else(|| core.spawner.pop())
Expand Down
11 changes: 7 additions & 4 deletions tokio/src/runtime/builder.rs
Expand Up @@ -632,6 +632,7 @@ impl Builder {
}

fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::basic_scheduler::Config;
use crate::runtime::{BasicScheduler, HandleInner, Kind};

let (driver, resources) = driver::Driver::new(self.get_cfg())?;
Expand All @@ -655,10 +656,12 @@ impl Builder {
let scheduler = BasicScheduler::new(
driver,
handle_inner,
self.before_park.clone(),
self.after_unpark.clone(),
self.global_queue_interval,
self.event_interval,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
},
);
let spawner = Spawner::Basic(scheduler.spawner().clone());

Expand Down