Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include more documentation for thread_pool/worker #4511

Merged
merged 1 commit into from Mar 2, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 19 additions & 1 deletion tokio/src/runtime/thread_pool/worker.rs
Expand Up @@ -126,7 +126,9 @@ pub(super) struct Shared {
/// how they communicate between each other.
remotes: Box<[Remote]>,

/// Submits work to the scheduler while **not** currently on a worker thread.
/// Global task queue used for:
/// 1. Submit work to the scheduler while **not** currently on a worker thread.
/// 2. Submit work to the scheduler when a worker run queue is saturated
inject: Inject<Arc<Shared>>,

/// Coordinates idle workers
Expand Down Expand Up @@ -470,6 +472,17 @@ impl Context {
core
}

/// Parks the worker thread while waiting for tasks to execute.
///
/// This function checks if indeed there's no more work left to be done before parking.
/// Also important to notice that, before parking, the worker thread will try to take
/// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
/// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
/// in its own local queue until the queue saturates (ntasks > LOCAL_QUEUE_CAPACITY).
/// When the local queue is saturated, the overflow tasks are added to the injection queue
/// from where other workers can pick them up.
/// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
/// after all the IOs get dispatched
fn park(&self, mut core: Box<Core>) -> Box<Core> {
if let Some(f) = &self.worker.shared.before_park {
f();
Expand Down Expand Up @@ -545,6 +558,11 @@ impl Core {
self.lifo_slot.take().or_else(|| self.run_queue.pop())
}

/// Function responsible for stealing tasks from another worker
///
/// Note: Only if less than half the workers are searching for tasks to steal
/// a new worker will actually try to steal. The idea is to make sure not all
/// workers will be trying to steal at the same time.
fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
if !self.transition_to_searching(worker) {
return None;
Expand Down