Skip to content

Commit

Permalink
rt: fix basic_scheduler notification bug (#1861)
Browse files Browse the repository at this point in the history
The "global executor" thread-local is to track where to spawn new tasks,
**not** which scheduler is active on the current thread. This fixes a
bug with scheduling tasks on the basic_scheduler by tracking the
currently active basic_scheduler with a dedicated thread-local variable.

Fixes: #1851
  • Loading branch information
carllerche committed Nov 29, 2019
1 parent ec7f2ae commit a2cfc87
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 10 deletions.
34 changes: 31 additions & 3 deletions tokio/src/runtime/basic_scheduler.rs
@@ -1,11 +1,12 @@
use crate::park::{Park, Unpark};
use crate::task::{self, JoinHandle, Schedule, ScheduleSendOnly, Task};

use std::cell::UnsafeCell;
use std::cell::{Cell, UnsafeCell};
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::mem::ManuallyDrop;
use std::ptr;
use std::sync::{Arc, Mutex};
use std::task::{RawWaker, RawWakerVTable, Waker};
use std::time::Duration;
Expand Down Expand Up @@ -87,6 +88,10 @@ const MAX_TASKS_PER_TICK: usize = 61;
/// How often to check the remote queue first
const CHECK_REMOTE_INTERVAL: u8 = 13;

thread_local! {
static ACTIVE: Cell<*const SchedulerPriv> = Cell::new(ptr::null())
}

impl<P> BasicScheduler<P>
where
P: Park,
Expand Down Expand Up @@ -138,6 +143,27 @@ where
let local = &mut self.local;
let scheduler = &*self.scheduler;

struct Guard {
old: *const SchedulerPriv,
}

impl Drop for Guard {
fn drop(&mut self) {
ACTIVE.with(|cell| cell.set(self.old));
}
}

// Track the current scheduler
let _guard = ACTIVE.with(|cell| {
let guard = Guard {
old: cell.get(),
};

cell.set(scheduler as *const SchedulerPriv);

guard
});

runtime::global::with_basic_scheduler(scheduler, || {
let mut _enter = runtime::enter();

Expand Down Expand Up @@ -283,9 +309,11 @@ impl Schedule for SchedulerPriv {
}

fn schedule(&self, task: Task<Self>) {
use crate::runtime::global;
let is_current = ACTIVE.with(|cell| {
cell.get() == self as *const SchedulerPriv
});

if global::basic_scheduler_is_current(self) {
if is_current {
unsafe { self.schedule_local(task) };
} else {
let mut lock = self.remote_queue.lock().unwrap();
Expand Down
7 changes: 0 additions & 7 deletions tokio/src/runtime/global.rs
Expand Up @@ -65,13 +65,6 @@ where
)
}

pub(super) fn basic_scheduler_is_current(basic_scheduler: &basic_scheduler::SchedulerPriv) -> bool {
EXECUTOR.with(|current_executor| match current_executor.get() {
State::Basic(ptr) => ptr == basic_scheduler as *const _,
_ => false,
})
}

cfg_rt_threaded! {
use crate::runtime::thread_pool;

Expand Down
20 changes: 20 additions & 0 deletions tokio/tests/fs.rs
@@ -0,0 +1,20 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use tokio::fs;
use tokio_test::assert_ok;

#[tokio::test]
async fn path_read_write() {
let temp = tempdir();
let dir = temp.path();

assert_ok!(fs::write(dir.join("bar"), b"bytes").await);
let out = assert_ok!(fs::read(dir.join("bar")).await);

assert_eq!(out, b"bytes");
}

fn tempdir() -> tempfile::TempDir {
tempfile::tempdir().unwrap()
}
15 changes: 15 additions & 0 deletions tokio/tests/rt_common.rs
Expand Up @@ -198,6 +198,21 @@ rt_test! {
}
}

#[test]
fn spawn_await_chain() {
let mut rt = rt();

let out = rt.block_on(async {
assert_ok!(tokio::spawn(async {
assert_ok!(tokio::spawn(async {
"hello"
}).await)
}).await)
});

assert_eq!(out, "hello");
}

#[test]
fn outstanding_tasks_dropped() {
let mut rt = rt();
Expand Down
29 changes: 29 additions & 0 deletions tokio/tests/task_blocking.rs
@@ -0,0 +1,29 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use tokio::task;
use tokio_test::assert_ok;

use std::thread;
use std::time::Duration;

#[tokio::test]
async fn basic_blocking() {
// Run a few times
for _ in 0..100 {
let out = assert_ok!(
tokio::spawn(async {
assert_ok!(
task::spawn_blocking(|| {
thread::sleep(Duration::from_millis(5));
"hello"
})
.await
)
})
.await
);

assert_eq!(out, "hello");
}
}

0 comments on commit a2cfc87

Please sign in to comment.