From 9853878c2a6800af6d8fe30f29abc26c0c613697 Mon Sep 17 00:00:00 2001 From: gftea Date: Sun, 12 Jun 2022 22:25:44 +0200 Subject: [PATCH 01/10] Add LocalSet::enter (#4736) --- tokio/src/task/local.rs | 12 ++++++++++++ tokio/tests/task_local_set.rs | 17 +++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 8c59876d603..bb283ed05d6 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -319,6 +319,10 @@ const MAX_TASKS_PER_TICK: usize = 61; /// How often it check the remote queue first. const REMOTE_FIRST_INTERVAL: u8 = 31; +pub struct EnterGuard<'a>{ + _guard: &'a LocalSet, +} + impl LocalSet { /// Returns a new local task set. pub fn new() -> LocalSet { @@ -336,6 +340,14 @@ impl LocalSet { } } + /// Enter current LocalSet context + pub fn enter(&self) -> EnterGuard<'_> { + CURRENT.inner.with(|c| { + c.set(&self.context as *const _ as *const ()); + EnterGuard {_guard: &self} + }) + } + /// Spawns a `!Send` task onto the local task set. /// /// This task is guaranteed to be run on the current thread. diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs index f8a35d0ede8..8ed14bd18e6 100644 --- a/tokio/tests/task_local_set.rs +++ b/tokio/tests/task_local_set.rs @@ -126,6 +126,23 @@ async fn local_threadpool_timer() { }) .await; } +#[test] +fn enter_guard_spawn() { + let local = LocalSet::new(); + let _guard = local.enter(); + // Run the local task set. + + let join = task::spawn_local(async { + true + }); + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + local.block_on(&rt, async move { + assert!(join.await.unwrap()); + }); +} #[test] // This will panic, since the thread that calls `block_on` cannot use From a457a3ac7bc2763578a321577cd4a6e8bdb28278 Mon Sep 17 00:00:00 2001 From: gftea Date: Mon, 13 Jun 2022 12:37:40 +0200 Subject: [PATCH 02/10] add Debug trait implementation for EnterGuard (#4736) --- tokio/src/task/local.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index bb283ed05d6..e41fc6287af 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -319,6 +319,7 @@ const MAX_TASKS_PER_TICK: usize = 61; /// How often it check the remote queue first. const REMOTE_FIRST_INTERVAL: u8 = 31; +#[derive(Debug)] pub struct EnterGuard<'a>{ _guard: &'a LocalSet, } From 8edb09e266a07b782615e85d5b229355c66f22b5 Mon Sep 17 00:00:00 2001 From: gftea Date: Tue, 14 Jun 2022 13:08:26 +0200 Subject: [PATCH 03/10] fix format by rustfmt (#4736) --- tokio/src/task/local.rs | 4 ++-- tokio/tests/task_local_set.rs | 10 ++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index e41fc6287af..a98f5ca6602 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -320,7 +320,7 @@ const MAX_TASKS_PER_TICK: usize = 61; const REMOTE_FIRST_INTERVAL: u8 = 31; #[derive(Debug)] -pub struct EnterGuard<'a>{ +pub struct EnterGuard<'a> { _guard: &'a LocalSet, } @@ -345,7 +345,7 @@ impl LocalSet { pub fn enter(&self) -> EnterGuard<'_> { CURRENT.inner.with(|c| { c.set(&self.context as *const _ as *const ()); - EnterGuard {_guard: &self} + EnterGuard { _guard: &self } }) } diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs index 8ed14bd18e6..cf0a8651234 100644 --- a/tokio/tests/task_local_set.rs +++ b/tokio/tests/task_local_set.rs @@ -132,13 +132,11 @@ fn enter_guard_spawn() { let _guard = local.enter(); // Run the local task set. - let join = task::spawn_local(async { - true - }); + let join = task::spawn_local(async { true }); let rt = runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); + .enable_all() + .build() + .unwrap(); local.block_on(&rt, async move { assert!(join.await.unwrap()); }); From a0431cdc9e90bb90dbe80af4209a2e3aa9cd9eb2 Mon Sep 17 00:00:00 2001 From: gftea Date: Thu, 16 Jun 2022 10:57:55 +0200 Subject: [PATCH 04/10] reexport LocalEnterGuard (#4736) --- tokio/src/task/local.rs | 6 +++--- tokio/src/task/mod.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index a98f5ca6602..2fa01197180 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -320,7 +320,7 @@ const MAX_TASKS_PER_TICK: usize = 61; const REMOTE_FIRST_INTERVAL: u8 = 31; #[derive(Debug)] -pub struct EnterGuard<'a> { +pub struct LocalEnterGuard<'a> { _guard: &'a LocalSet, } @@ -342,10 +342,10 @@ impl LocalSet { } /// Enter current LocalSet context - pub fn enter(&self) -> EnterGuard<'_> { + pub fn enter(&self) -> LocalEnterGuard<'_> { CURRENT.inner.with(|c| { c.set(&self.context as *const _ as *const ()); - EnterGuard { _guard: &self } + LocalEnterGuard { _guard: &self } }) } diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index 8ea73fb48de..409cd1327b1 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -297,7 +297,7 @@ cfg_rt! { } mod local; - pub use local::{spawn_local, LocalSet}; + pub use local::{spawn_local, LocalSet, LocalEnterGuard}; mod task_local; pub use task_local::LocalKey; From ea414c3ba451313193d6e219f7ea029d55ae4fd3 Mon Sep 17 00:00:00 2001 From: gftea Date: Fri, 17 Jun 2022 23:08:29 +0200 Subject: [PATCH 05/10] Use thread_local to reference LocalSet's context instead of scoped_thread_local (#4764) --- tokio/src/task/local.rs | 73 +++++++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 2fa01197180..7334fbeda64 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -4,12 +4,13 @@ use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task}; use crate::sync::AtomicWaker; use crate::util::VecDequeCell; -use std::cell::Cell; +use std::cell::{Cell, RefCell}; use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; +use std::rc::Rc; use std::task::Poll; use pin_project_lite::pin_project; @@ -215,7 +216,7 @@ cfg_rt! { tick: Cell, /// State available from thread-local. - context: Context, + context: Rc, /// This type should not be Send. _not_send: PhantomData<*const ()>, @@ -252,7 +253,7 @@ pin_project! { } } -scoped_thread_local!(static CURRENT: Context); +thread_local!(static CURRENT: RefCell>> = RefCell::new(None)); cfg_rt! { /// Spawns a `!Send` future on the local task set. @@ -302,10 +303,11 @@ cfg_rt! { F::Output: 'static { CURRENT.with(|maybe_cx| { - let cx = maybe_cx - .expect("`spawn_local` called from outside of a `task::LocalSet`"); + match maybe_cx.borrow().as_ref() { + None => panic!("`spawn_local` called from outside of a `task::LocalSet`"), + Some(cx) => cx.spawn(future, name) + } - cx.spawn(future, name) }) } } @@ -319,9 +321,17 @@ const MAX_TASKS_PER_TICK: usize = 61; /// How often it check the remote queue first. const REMOTE_FIRST_INTERVAL: u8 = 31; -#[derive(Debug)] -pub struct LocalEnterGuard<'a> { - _guard: &'a LocalSet, +/// Context guard for LocalSet +#[allow(missing_debug_implementations)] +pub struct LocalEnterGuard(Option>); + +impl Drop for LocalEnterGuard { + fn drop(&mut self) { + CURRENT.with(|ctx| { + // *ctx.borrow_mut() = self.0.take(); + ctx.replace(self.0.take()); + }) + } } impl LocalSet { @@ -329,23 +339,23 @@ impl LocalSet { pub fn new() -> LocalSet { LocalSet { tick: Cell::new(0), - context: Context { + context: Rc::new(Context { owned: LocalOwnedTasks::new(), queue: VecDequeCell::with_capacity(INITIAL_CAPACITY), shared: Arc::new(Shared { queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), waker: AtomicWaker::new(), }), - }, + }), _not_send: PhantomData, } } /// Enter current LocalSet context - pub fn enter(&self) -> LocalEnterGuard<'_> { - CURRENT.inner.with(|c| { - c.set(&self.context as *const _ as *const ()); - LocalEnterGuard { _guard: &self } + pub fn enter(&self) -> LocalEnterGuard { + CURRENT.with(|ctx| { + let old = ctx.borrow_mut().replace(self.context.clone()); + LocalEnterGuard(old) }) } @@ -576,7 +586,26 @@ impl LocalSet { } fn with(&self, f: impl FnOnce() -> T) -> T { - CURRENT.set(&self.context, f) + // CURRENT.set(&self.context, f) + CURRENT.with(|ctx| { + struct Reset<'a> { + ctx_ref: &'a RefCell>>, + val: Option>, + } + impl<'a> Drop for Reset<'a> { + fn drop(&mut self) { + self.ctx_ref.replace(self.val.take()); + } + } + let old = ctx.borrow_mut().replace(self.context.clone()); + + let _reset = Reset { + ctx_ref: ctx, + val: old, + }; + + f() + }) } } @@ -699,7 +728,7 @@ impl Future for RunUntil<'_, T> { impl Shared { /// Schedule the provided task on the scheduler. fn schedule(&self, task: task::Notified>) { - CURRENT.with(|maybe_cx| match maybe_cx { + CURRENT.with(|maybe_cx| match maybe_cx.borrow().as_ref() { Some(cx) if cx.shared.ptr_eq(self) => { cx.queue.push_back(task); } @@ -725,10 +754,12 @@ impl Shared { impl task::Schedule for Arc { fn release(&self, task: &Task) -> Option> { - CURRENT.with(|maybe_cx| { - let cx = maybe_cx.expect("scheduler context missing"); - assert!(cx.shared.ptr_eq(self)); - cx.owned.remove(task) + CURRENT.with(|maybe_cx| match maybe_cx.borrow().as_ref() { + None => panic!("scheduler context missing"), + Some(cx) => { + assert!(cx.shared.ptr_eq(self)); + cx.owned.remove(task) + } }) } From 720a8de54bb2f1735813cd8701fe163beb4aa65a Mon Sep 17 00:00:00 2001 From: gftea Date: Tue, 28 Jun 2022 20:04:48 +0200 Subject: [PATCH 06/10] wrap thread_local in Cell instead RefCell (#4764) --- tokio/src/task/local.rs | 66 ++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 7334fbeda64..587c45694a5 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -4,7 +4,7 @@ use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task}; use crate::sync::AtomicWaker; use crate::util::VecDequeCell; -use std::cell::{Cell, RefCell}; +use std::cell::Cell; use std::collections::VecDeque; use std::fmt; use std::future::Future; @@ -253,7 +253,7 @@ pin_project! { } } -thread_local!(static CURRENT: RefCell>> = RefCell::new(None)); +thread_local!(static CURRENT: Cell>> = Cell::new(None)); cfg_rt! { /// Spawns a `!Send` future on the local task set. @@ -303,7 +303,8 @@ cfg_rt! { F::Output: 'static { CURRENT.with(|maybe_cx| { - match maybe_cx.borrow().as_ref() { + let ctx = clone_rc(maybe_cx); + match ctx { None => panic!("`spawn_local` called from outside of a `task::LocalSet`"), Some(cx) => cx.spawn(future, name) } @@ -328,7 +329,6 @@ pub struct LocalEnterGuard(Option>); impl Drop for LocalEnterGuard { fn drop(&mut self) { CURRENT.with(|ctx| { - // *ctx.borrow_mut() = self.0.take(); ctx.replace(self.0.take()); }) } @@ -354,7 +354,7 @@ impl LocalSet { /// Enter current LocalSet context pub fn enter(&self) -> LocalEnterGuard { CURRENT.with(|ctx| { - let old = ctx.borrow_mut().replace(self.context.clone()); + let old = ctx.replace(Some(self.context.clone())); LocalEnterGuard(old) }) } @@ -586,10 +586,9 @@ impl LocalSet { } fn with(&self, f: impl FnOnce() -> T) -> T { - // CURRENT.set(&self.context, f) CURRENT.with(|ctx| { struct Reset<'a> { - ctx_ref: &'a RefCell>>, + ctx_ref: &'a Cell>>, val: Option>, } impl<'a> Drop for Reset<'a> { @@ -597,7 +596,7 @@ impl LocalSet { self.ctx_ref.replace(self.val.take()); } } - let old = ctx.borrow_mut().replace(self.context.clone()); + let old = ctx.replace(Some(self.context.clone())); let _reset = Reset { ctx_ref: ctx, @@ -725,23 +724,33 @@ impl Future for RunUntil<'_, T> { } } +fn clone_rc(rc: &Cell>>) -> Option> { + let value = rc.take(); + let cloned = value.clone(); + rc.set(value); + cloned +} + impl Shared { /// Schedule the provided task on the scheduler. fn schedule(&self, task: task::Notified>) { - CURRENT.with(|maybe_cx| match maybe_cx.borrow().as_ref() { - Some(cx) if cx.shared.ptr_eq(self) => { - cx.queue.push_back(task); - } - _ => { - // First check whether the queue is still there (if not, the - // LocalSet is dropped). Then push to it if so, and if not, - // do nothing. - let mut lock = self.queue.lock(); - - if let Some(queue) = lock.as_mut() { - queue.push_back(task); - drop(lock); - self.waker.wake(); + CURRENT.with(|maybe_cx| { + let ctx = clone_rc(maybe_cx); + match ctx { + Some(cx) if cx.shared.ptr_eq(self) => { + cx.queue.push_back(task); + } + _ => { + // First check whether the queue is still there (if not, the + // LocalSet is dropped). Then push to it if so, and if not, + // do nothing. + let mut lock = self.queue.lock(); + + if let Some(queue) = lock.as_mut() { + queue.push_back(task); + drop(lock); + self.waker.wake(); + } } } }); @@ -754,11 +763,14 @@ impl Shared { impl task::Schedule for Arc { fn release(&self, task: &Task) -> Option> { - CURRENT.with(|maybe_cx| match maybe_cx.borrow().as_ref() { - None => panic!("scheduler context missing"), - Some(cx) => { - assert!(cx.shared.ptr_eq(self)); - cx.owned.remove(task) + CURRENT.with(|maybe_cx| { + let ctx = clone_rc(maybe_cx); + match ctx { + None => panic!("scheduler context missing"), + Some(cx) => { + assert!(cx.shared.ptr_eq(self)); + cx.owned.remove(task) + } } }) } From 8045d5db80f9434e63bbd42eea24a0a0f6349fe0 Mon Sep 17 00:00:00 2001 From: gftea Date: Tue, 28 Jun 2022 22:15:44 +0200 Subject: [PATCH 07/10] fix compilation error in CI --- tokio/src/task/local.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 3d4df14d20f..6a48157102d 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -686,7 +686,7 @@ cfg_unstable! { /// [`JoinHandle`]: struct@crate::task::JoinHandle pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self { // TODO: This should be set as a builder - Arc::get_mut(&mut self.context.shared) + Arc::get_mut(&mut Rc::get_mut(&mut self.context).expect("TODO: context should not panic").shared) .expect("TODO: we shouldn't panic") .unhandled_panic = behavior; self From 702a4ca3e4023fd90357f1eb9617df63d1f4f024 Mon Sep 17 00:00:00 2001 From: gftea <1705787+gftea@users.noreply.github.com> Date: Wed, 29 Jun 2022 15:19:15 +0200 Subject: [PATCH 08/10] Update tokio/src/task/local.rs Co-authored-by: Alice Ryhl --- tokio/src/task/local.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 6a48157102d..41d42a6e7f1 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -686,8 +686,9 @@ cfg_unstable! { /// [`JoinHandle`]: struct@crate::task::JoinHandle pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self { // TODO: This should be set as a builder - Arc::get_mut(&mut Rc::get_mut(&mut self.context).expect("TODO: context should not panic").shared) - .expect("TODO: we shouldn't panic") + Rc::get_mut(&mut self.context) + .and_then(|ctx| Arc::get_mut(&mut ctx.shared)) + .expect("Unhandled Panic behavior modified after starting LocalSet") .unhandled_panic = behavior; self } From 35641974e4bcc94d351944aa5ab2e3d9106521e9 Mon Sep 17 00:00:00 2001 From: gftea Date: Wed, 29 Jun 2022 15:24:57 +0200 Subject: [PATCH 09/10] task: implement debug for enter guard (#4736) --- tokio/src/task/local.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 41d42a6e7f1..fe4b9a75a07 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -331,7 +331,6 @@ const MAX_TASKS_PER_TICK: usize = 61; const REMOTE_FIRST_INTERVAL: u8 = 31; /// Context guard for LocalSet -#[allow(missing_debug_implementations)] pub struct LocalEnterGuard(Option>); impl Drop for LocalEnterGuard { @@ -342,6 +341,12 @@ impl Drop for LocalEnterGuard { } } +impl fmt::Debug for LocalEnterGuard { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LocalEnterGuard").finish() + } +} + impl LocalSet { /// Returns a new local task set. pub fn new() -> LocalSet { From a0d81b8bf0e833a3fa76194dacd3464410c0c5c5 Mon Sep 17 00:00:00 2001 From: gftea <1705787+gftea@users.noreply.github.com> Date: Tue, 5 Jul 2022 22:03:54 +0200 Subject: [PATCH 10/10] Update comments tokio/src/task/local.rs Co-authored-by: Alice Ryhl --- tokio/src/task/local.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index fe4b9a75a07..d37607a4b5d 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -367,7 +367,12 @@ impl LocalSet { } } - /// Enter current LocalSet context + /// Enters the context of this `LocalSet`. + /// + /// The [`spawn_local`] method will spawn tasks on the `LocalSet` whose + /// context you are inside. + /// + /// [`spawn_local`]: fn@crate::task::spawn_local pub fn enter(&self) -> LocalEnterGuard { CURRENT.with(|ctx| { let old = ctx.replace(Some(self.context.clone()));