Skip to content

Commit

Permalink
rt: cleanup and simplify scheduler (scheduler v2.5) (#2273)
Browse files Browse the repository at this point in the history
A refactor of the scheduler internals focusing on simplifying and
reducing unsafety. There are no fundamental logic changes.

* The state transitions of the core task component are refined and
reduced.
* `basic_scheduler` has most unsafety removed.
* `local_set` has most unsafety removed.
* `threaded_scheduler` limits most unsafety to its queue implementation.
  • Loading branch information
carllerche committed Mar 5, 2020
1 parent 5ede2e4 commit a78b1c6
Show file tree
Hide file tree
Showing 77 changed files with 4,396 additions and 6,423 deletions.
18 changes: 12 additions & 6 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,10 @@ jobs:
displayName: Test build permutations
rust: stable

# Run loom tests
- template: ci/azure-loom.yml
# Run miri tests
- template: ci/azure-miri.yml
parameters:
name: loom
rust: stable
crates:
- tokio
name: miri

# Try cross compiling
- template: ci/azure-cross-compile.yml
Expand Down Expand Up @@ -99,16 +96,25 @@ jobs:
# name: tsan
# rust: stable

# Run loom tests
- template: ci/azure-loom.yml
parameters:
name: loom
rust: stable

- template: ci/azure-deploy-docs.yml
parameters:
rust: stable
dependsOn:
- rustfmt
- docs
- clippy
- test_tokio
- test_linux
- test_integration
- test_build
- loom
- miri
- cross
- minrust
- check_features
Expand Down
5 changes: 5 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,8 @@ harness = false
name = "mpsc"
path = "mpsc.rs"
harness = false

[[bench]]
name = "scheduler"
path = "scheduler.rs"
harness = false
152 changes: 152 additions & 0 deletions benches/scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//! Benchmark implementation details of the theaded scheduler. These benches are
//! intended to be used as a form of regression testing and not as a general
//! purpose benchmark demonstrating real-world performance.

use tokio::runtime::{self, Runtime};
use tokio::sync::oneshot;

use bencher::{benchmark_group, benchmark_main, Bencher};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{mpsc, Arc};

fn spawn_many(b: &mut Bencher) {
const NUM_SPAWN: usize = 10_000;

let mut rt = rt();

let (tx, rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));

b.iter(|| {
rem.store(NUM_SPAWN, Relaxed);

rt.block_on(async {
for _ in 0..NUM_SPAWN {
let tx = tx.clone();
let rem = rem.clone();

tokio::spawn(async move {
if 1 == rem.fetch_sub(1, Relaxed) {
tx.send(()).unwrap();
}
});
}

let _ = rx.recv().unwrap();
});
});
}

fn yield_many(b: &mut Bencher) {
const NUM_YIELD: usize = 1_000;
const TASKS: usize = 200;

let rt = rt();

let (tx, rx) = mpsc::sync_channel(TASKS);

b.iter(move || {
for _ in 0..TASKS {
let tx = tx.clone();

rt.spawn(async move {
for _ in 0..NUM_YIELD {
tokio::task::yield_now().await;
}

tx.send(()).unwrap();
});
}

for _ in 0..TASKS {
let _ = rx.recv().unwrap();
}
});
}

fn ping_pong(b: &mut Bencher) {
const NUM_PINGS: usize = 1_000;

let mut rt = rt();

let (done_tx, done_rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));

b.iter(|| {
let done_tx = done_tx.clone();
let rem = rem.clone();
rem.store(NUM_PINGS, Relaxed);

rt.block_on(async {
tokio::spawn(async move {
for _ in 0..NUM_PINGS {
let rem = rem.clone();
let done_tx = done_tx.clone();

tokio::spawn(async move {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

tokio::spawn(async move {
rx1.await.unwrap();
tx2.send(()).unwrap();
});

tx1.send(()).unwrap();
rx2.await.unwrap();

if 1 == rem.fetch_sub(1, Relaxed) {
done_tx.send(()).unwrap();
}
});
}
});

done_rx.recv().unwrap();
});
});
}

fn chained_spawn(b: &mut Bencher) {
const ITER: usize = 1_000;

let mut rt = rt();

fn iter(done_tx: mpsc::SyncSender<()>, n: usize) {
if n == 0 {
done_tx.send(()).unwrap();
} else {
tokio::spawn(async move {
iter(done_tx, n - 1);
});
}
}

let (done_tx, done_rx) = mpsc::sync_channel(1000);

b.iter(move || {
let done_tx = done_tx.clone();

rt.block_on(async {
tokio::spawn(async move {
iter(done_tx, ITER);
});

done_rx.recv().unwrap();
});
});
}

fn rt() -> Runtime {
runtime::Builder::new()
.threaded_scheduler()
.core_threads(4)
.enable_all()
.build()
.unwrap()
}

benchmark_group!(scheduler, spawn_many, ping_pong, yield_many, chained_spawn,);

benchmark_main!(scheduler);
2 changes: 1 addition & 1 deletion ci/azure-check-features.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
Linux:
vmImage: ubuntu-16.04
MacOS:
vmImage: macOS-10.13
vmImage: macos-latest
Windows:
vmImage: vs2017-win2016
pool:
Expand Down
7 changes: 7 additions & 0 deletions ci/azure-install-rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ steps:
# Linux and macOS.
- script: |
set -e
if [ "$RUSTUP_TOOLCHAIN" == "nightly" ]; then
echo "++ getting latest miri version"
export RUSTUP_TOOLCHAIN="nightly-$(curl -s https://rust-lang.github.io/rustup-components-history/x86_64-unknown-linux-gnu/miri)"
echo "$RUSTUP_TOOLCHAIN"
fi
curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain none
export PATH=$PATH:$HOME/.cargo/bin
rustup toolchain install $RUSTUP_TOOLCHAIN
Expand Down
25 changes: 18 additions & 7 deletions ci/azure-loom.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
jobs:
- job: ${{ parameters.name }}
displayName: Loom tests
strategy:
matrix:
rest:
scope: --skip loom_pool
pool_group_a:
scope: loom_pool::group_a
pool_group_b:
scope: loom_pool::group_b
pool_group_c:
scope: loom_pool::group_c
pool_group_d:
scope: loom_pool::group_d
pool:
vmImage: ubuntu-16.04

Expand All @@ -9,10 +21,9 @@ jobs:
parameters:
rust_version: ${{ parameters.rust }}

- ${{ each crate in parameters.crates }}:
- script: RUSTFLAGS="--cfg loom" cargo test --lib --release --features "full" -- --test-threads=1 --nocapture
env:
LOOM_MAX_PREEMPTIONS: 1
CI: 'True'
displayName: test ${{ crate }}
workingDirectory: $(Build.SourcesDirectory)/${{ crate }}
- script: RUSTFLAGS="--cfg loom" cargo test --lib --release --features "full" -- --nocapture $(scope)
env:
LOOM_MAX_PREEMPTIONS: 2
CI: 'True'
displayName: $(scope)
workingDirectory: $(Build.SourcesDirectory)/tokio
23 changes: 23 additions & 0 deletions ci/azure-miri.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
jobs:
- job: ${{ parameters.name }}
displayName: Miri
pool:
vmImage: ubuntu-16.04

steps:
- template: azure-install-rust.yml
parameters:
rust_version: nightly

- script: |
rustup component add miri
cargo miri setup
rm -rf $(Build.SourcesDirectory)/tokio/tests
displayName: Install miri
# TODO: enable all tests once they pass
- script: cargo miri test --features rt-core,rt-threaded,rt-util,sync -- -- task
env:
CI: 'True'
displayName: cargo miri test
workingDirectory: $(Build.SourcesDirectory)/tokio
2 changes: 1 addition & 1 deletion ci/azure-test-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
Linux:
vmImage: ubuntu-16.04
MacOS:
vmImage: macOS-10.13
vmImage: macos-latest
Windows:
vmImage: vs2017-win2016
pool:
Expand Down
2 changes: 1 addition & 1 deletion ci/azure-test-stable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:

${{ if parameters.cross }}:
MacOS:
vmImage: macOS-10.13
vmImage: macos-latest
Windows:
vmImage: vs2017-win2016
pool:
Expand Down
4 changes: 0 additions & 4 deletions tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,6 @@ cfg_macros! {
}
}

// Tests
#[cfg(test)]
mod tests;

// TODO: rm
#[cfg(feature = "io-util")]
#[cfg(test)]
Expand Down
18 changes: 0 additions & 18 deletions tokio/src/loom/std/alloc.rs

This file was deleted.

9 changes: 0 additions & 9 deletions tokio/src/loom/std/causal_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,6 @@ impl<T> CausalCell<T> {
f(self.0.get())
}

pub(crate) fn with_unchecked<F, R>(&self, f: F) -> R
where
F: FnOnce(*const T) -> R,
{
f(self.0.get())
}

pub(crate) fn check(&self) {}

pub(crate) fn with_deferred<F, R>(&self, f: F) -> (R, CausalCheck)
where
F: FnOnce(*const T) -> R,
Expand Down
2 changes: 0 additions & 2 deletions tokio/src/loom/std/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ mod atomic_u64;
mod atomic_usize;
mod causal_cell;

pub(crate) mod alloc;

pub(crate) mod cell {
pub(crate) use super::causal_cell::{CausalCell, CausalCheck};
}
Expand Down
18 changes: 0 additions & 18 deletions tokio/src/macros/assert.rs

This file was deleted.

0 comments on commit a78b1c6

Please sign in to comment.