Skip to content

Commit

Permalink
rt: refactor current-thread scheduler (take 2) (#4395)
Browse files Browse the repository at this point in the history
Re-applies #4377 and fixes the bug resulting in Hyper's double panic.

Revert: #4394

Original PR:

This PR does some refactoring to the current-thread scheduler bringing it closer to the structure of the
multi-threaded scheduler. More specifically, the core scheduler data is stored in a Core struct and that
struct is passed around as a "token" indicating permission to do work. The Core structure is also stored
in the thread-local context.

This refactor is intended to support #4373, making it easier to track counters in more locations in the
current-thread scheduler.

I tried to keep commits small, but the "set Core in thread-local context" is both the biggest commit and
the key one.
  • Loading branch information
carllerche committed Jan 12, 2022
1 parent 1d698b5 commit e951d55
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 215 deletions.
420 changes: 224 additions & 196 deletions tokio/src/runtime/basic_scheduler.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion tokio/src/runtime/mod.rs
Expand Up @@ -283,7 +283,7 @@ cfg_rt! {
#[derive(Debug)]
enum Kind {
/// Execute all tasks on the current-thread.
CurrentThread(BasicScheduler<driver::Driver>),
CurrentThread(BasicScheduler),

/// Execute tasks across multiple threads.
#[cfg(feature = "rt-multi-thread")]
Expand Down
20 changes: 11 additions & 9 deletions tokio/src/runtime/tests/loom_basic_scheduler.rs
Expand Up @@ -34,20 +34,22 @@ fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) {
#[test]
fn block_on_num_polls() {
loom::model(|| {
// we expect at most 3 number of polls because there are
// three points at which we poll the future. At any of these
// points it can be ready:
// we expect at most 4 number of polls because there are three points at
// which we poll the future and an opportunity for a false-positive.. At
// any of these points it can be ready:
//
// - when we fail to steal the parker and we block on a
// notification that it is available.
// - when we fail to steal the parker and we block on a notification
// that it is available.
//
// - when we steal the parker and we schedule the future
//
// - when the future is woken up and we have ran the max
// number of tasks for the current tick or there are no
// more tasks to run.
// - when the future is woken up and we have ran the max number of tasks
// for the current tick or there are no more tasks to run.
//
let at_most = 3;
// - a thread is notified that the parker is available but a third
// thread acquires it before the notified thread can.
//
let at_most = 4;

let rt1 = Arc::new(Builder::new_current_thread().build().unwrap());
let rt2 = rt1.clone();
Expand Down
3 changes: 0 additions & 3 deletions tokio/src/runtime/thread_pool/mod.rs
@@ -1,8 +1,5 @@
//! Threadpool

mod atomic_cell;
use atomic_cell::AtomicCell;

mod idle;
use self::idle::Idle;

Expand Down
3 changes: 2 additions & 1 deletion tokio/src/runtime/thread_pool/worker.rs
Expand Up @@ -66,8 +66,9 @@ use crate::runtime::enter::EnterContext;
use crate::runtime::park::{Parker, Unparker};
use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
use crate::runtime::thread_pool::{AtomicCell, Idle};
use crate::runtime::thread_pool::Idle;
use crate::runtime::{queue, task, Callback};
use crate::util::atomic_cell::AtomicCell;
use crate::util::FastRand;

use std::cell::RefCell;
Expand Down
Expand Up @@ -3,30 +3,30 @@ use crate::loom::sync::atomic::AtomicPtr;
use std::ptr;
use std::sync::atomic::Ordering::AcqRel;

pub(super) struct AtomicCell<T> {
pub(crate) struct AtomicCell<T> {
data: AtomicPtr<T>,
}

unsafe impl<T: Send> Send for AtomicCell<T> {}
unsafe impl<T: Send> Sync for AtomicCell<T> {}

impl<T> AtomicCell<T> {
pub(super) fn new(data: Option<Box<T>>) -> AtomicCell<T> {
pub(crate) fn new(data: Option<Box<T>>) -> AtomicCell<T> {
AtomicCell {
data: AtomicPtr::new(to_raw(data)),
}
}

pub(super) fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> {
pub(crate) fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> {
let old = self.data.swap(to_raw(val), AcqRel);
from_raw(old)
}

pub(super) fn set(&self, val: Box<T>) {
pub(crate) fn set(&self, val: Box<T>) {
let _ = self.swap(Some(val));
}

pub(super) fn take(&self) -> Option<Box<T>> {
pub(crate) fn take(&self) -> Option<Box<T>> {
self.swap(None)
}
}
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/util/mod.rs
Expand Up @@ -3,6 +3,9 @@ cfg_io_driver! {
pub(crate) mod slab;
}

#[cfg(feature = "rt")]
pub(crate) mod atomic_cell;

#[cfg(any(
// io driver uses `WakeList` directly
feature = "net",
Expand Down
29 changes: 29 additions & 0 deletions tokio/tests/rt_basic.rs
Expand Up @@ -168,6 +168,35 @@ fn drop_tasks_in_context() {
assert!(SUCCESS.load(Ordering::SeqCst));
}

#[test]
#[should_panic(expected = "boom")]
fn wake_in_drop_after_panic() {
let (tx, rx) = oneshot::channel::<()>();

struct WakeOnDrop(Option<oneshot::Sender<()>>);

impl Drop for WakeOnDrop {
fn drop(&mut self) {
self.0.take().unwrap().send(()).unwrap();
}
}

let rt = rt();

rt.spawn(async move {
let _wake_on_drop = WakeOnDrop(Some(tx));
// wait forever
futures::future::pending::<()>().await;
});

let _join = rt.spawn(async move { rx.await });

rt.block_on(async {
tokio::task::yield_now().await;
panic!("boom");
});
}

#[test]
#[should_panic(
expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."
Expand Down

0 comments on commit e951d55

Please sign in to comment.