Skip to content

Commit

Permalink
Use thread_local to reference LocalSet's context instead of scoped_th…
Browse files Browse the repository at this point in the history
…read_local (tokio-rs#4764)
  • Loading branch information
gftea committed Jun 17, 2022
1 parent a2007ac commit ea414c3
Showing 1 changed file with 52 additions and 21 deletions.
73 changes: 52 additions & 21 deletions tokio/src/task/local.rs
Expand Up @@ -4,12 +4,13 @@ use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task};
use crate::sync::AtomicWaker;
use crate::util::VecDequeCell;

use std::cell::Cell;
use std::cell::{Cell, RefCell};
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Poll;

use pin_project_lite::pin_project;
Expand Down Expand Up @@ -215,7 +216,7 @@ cfg_rt! {
tick: Cell<u8>,

/// State available from thread-local.
context: Context,
context: Rc<Context>,

/// This type should not be Send.
_not_send: PhantomData<*const ()>,
Expand Down Expand Up @@ -252,7 +253,7 @@ pin_project! {
}
}

scoped_thread_local!(static CURRENT: Context);
thread_local!(static CURRENT: RefCell<Option<Rc<Context>>> = RefCell::new(None));

cfg_rt! {
/// Spawns a `!Send` future on the local task set.
Expand Down Expand Up @@ -302,10 +303,11 @@ cfg_rt! {
F::Output: 'static
{
CURRENT.with(|maybe_cx| {
let cx = maybe_cx
.expect("`spawn_local` called from outside of a `task::LocalSet`");
match maybe_cx.borrow().as_ref() {
None => panic!("`spawn_local` called from outside of a `task::LocalSet`"),
Some(cx) => cx.spawn(future, name)
}

cx.spawn(future, name)
})
}
}
Expand All @@ -319,33 +321,41 @@ const MAX_TASKS_PER_TICK: usize = 61;
/// How often it check the remote queue first.
const REMOTE_FIRST_INTERVAL: u8 = 31;

#[derive(Debug)]
pub struct LocalEnterGuard<'a> {
_guard: &'a LocalSet,
/// Context guard for LocalSet
#[allow(missing_debug_implementations)]
pub struct LocalEnterGuard(Option<Rc<Context>>);

impl Drop for LocalEnterGuard {
fn drop(&mut self) {
CURRENT.with(|ctx| {
// *ctx.borrow_mut() = self.0.take();
ctx.replace(self.0.take());
})
}
}

impl LocalSet {
/// Returns a new local task set.
pub fn new() -> LocalSet {
LocalSet {
tick: Cell::new(0),
context: Context {
context: Rc::new(Context {
owned: LocalOwnedTasks::new(),
queue: VecDequeCell::with_capacity(INITIAL_CAPACITY),
shared: Arc::new(Shared {
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
waker: AtomicWaker::new(),
}),
},
}),
_not_send: PhantomData,
}
}

/// Enter current LocalSet context
pub fn enter(&self) -> LocalEnterGuard<'_> {
CURRENT.inner.with(|c| {
c.set(&self.context as *const _ as *const ());
LocalEnterGuard { _guard: &self }
pub fn enter(&self) -> LocalEnterGuard {
CURRENT.with(|ctx| {
let old = ctx.borrow_mut().replace(self.context.clone());
LocalEnterGuard(old)
})
}

Expand Down Expand Up @@ -576,7 +586,26 @@ impl LocalSet {
}

fn with<T>(&self, f: impl FnOnce() -> T) -> T {
CURRENT.set(&self.context, f)
// CURRENT.set(&self.context, f)
CURRENT.with(|ctx| {
struct Reset<'a> {
ctx_ref: &'a RefCell<Option<Rc<Context>>>,
val: Option<Rc<Context>>,
}
impl<'a> Drop for Reset<'a> {
fn drop(&mut self) {
self.ctx_ref.replace(self.val.take());
}
}
let old = ctx.borrow_mut().replace(self.context.clone());

let _reset = Reset {
ctx_ref: ctx,
val: old,
};

f()
})
}
}

Expand Down Expand Up @@ -699,7 +728,7 @@ impl<T: Future> Future for RunUntil<'_, T> {
impl Shared {
/// Schedule the provided task on the scheduler.
fn schedule(&self, task: task::Notified<Arc<Self>>) {
CURRENT.with(|maybe_cx| match maybe_cx {
CURRENT.with(|maybe_cx| match maybe_cx.borrow().as_ref() {
Some(cx) if cx.shared.ptr_eq(self) => {
cx.queue.push_back(task);
}
Expand All @@ -725,10 +754,12 @@ impl Shared {

impl task::Schedule for Arc<Shared> {
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("scheduler context missing");
assert!(cx.shared.ptr_eq(self));
cx.owned.remove(task)
CURRENT.with(|maybe_cx| match maybe_cx.borrow().as_ref() {
None => panic!("scheduler context missing"),
Some(cx) => {
assert!(cx.shared.ptr_eq(self));
cx.owned.remove(task)
}
})
}

Expand Down

0 comments on commit ea414c3

Please sign in to comment.