Skip to content

Commit

Permalink
task: use pin-project for TaskLocalFuture (#5758)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Jun 10, 2023
1 parent a2941e4 commit e63d0f1
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 62 deletions.
2 changes: 1 addition & 1 deletion tokio-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ signal = ["tokio/signal"]

[dependencies]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
pin-project-lite = "0.2.7"
tokio = { version = "1.15.0", path = "../tokio", features = ["sync"] }
tokio-util = { version = "0.7.0", path = "../tokio-util", optional = true }

Expand Down
2 changes: 1 addition & 1 deletion tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ futures-core = "0.3.0"
futures-sink = "0.3.0"
futures-io = { version = "0.3.0", optional = true }
futures-util = { version = "0.3.0", optional = true }
pin-project-lite = "0.2.0"
pin-project-lite = "0.2.7"
slab = { version = "0.4.4", optional = true } # Backs `DelayQueue`
tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true }

Expand Down
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ autocfg = "1.1"
[dependencies]
tokio-macros = { version = "~2.1.0", path = "../tokio-macros", optional = true }

pin-project-lite = "0.2.0"
pin-project-lite = "0.2.7"

# Everything else is optional...
bytes = { version = "1.0.0", optional = true }
Expand Down
121 changes: 62 additions & 59 deletions tokio/src/task/task_local.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use pin_project_lite::pin_project;
use std::cell::RefCell;
use std::error::Error;
use std::future::Future;
Expand Down Expand Up @@ -299,60 +300,75 @@ impl<T: 'static> fmt::Debug for LocalKey<T> {
}
}

/// A future that sets a value `T` of a task local for the future `F` during
/// its execution.
///
/// The value of the task-local must be `'static` and will be dropped on the
/// completion of the future.
///
/// Created by the function [`LocalKey::scope`](self::LocalKey::scope).
///
/// ### Examples
///
/// ```
/// # async fn dox() {
/// tokio::task_local! {
/// static NUMBER: u32;
/// }
///
/// NUMBER.scope(1, async move {
/// println!("task local value: {}", NUMBER.get());
/// }).await;
/// # }
/// ```
// Doesn't use pin_project due to custom Drop.
pub struct TaskLocalFuture<T, F>
where
T: 'static,
{
local: &'static LocalKey<T>,
slot: Option<T>,
future: Option<F>,
_pinned: PhantomPinned,
pin_project! {
/// A future that sets a value `T` of a task local for the future `F` during
/// its execution.
///
/// The value of the task-local must be `'static` and will be dropped on the
/// completion of the future.
///
/// Created by the function [`LocalKey::scope`](self::LocalKey::scope).
///
/// ### Examples
///
/// ```
/// # async fn dox() {
/// tokio::task_local! {
/// static NUMBER: u32;
/// }
///
/// NUMBER.scope(1, async move {
/// println!("task local value: {}", NUMBER.get());
/// }).await;
/// # }
/// ```
pub struct TaskLocalFuture<T, F>
where
T: 'static,
{
local: &'static LocalKey<T>,
slot: Option<T>,
#[pin]
future: Option<F>,
#[pin]
_pinned: PhantomPinned,
}

impl<T: 'static, F> PinnedDrop for TaskLocalFuture<T, F> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
if mem::needs_drop::<F>() && this.future.is_some() {
// Drop the future while the task-local is set, if possible. Otherwise
// the future is dropped normally when the `Option<F>` field drops.
let mut future = this.future;
let _ = this.local.scope_inner(this.slot, || {
future.set(None);
});
}
}
}
}

impl<T: 'static, F: Future> Future for TaskLocalFuture<T, F> {
type Output = F::Output;

#[track_caller]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// safety: The TaskLocalFuture struct is `!Unpin` so there is no way to
// move `self.future` from now on.
let this = unsafe { Pin::into_inner_unchecked(self) };
let mut future_opt = unsafe { Pin::new_unchecked(&mut this.future) };

let res =
this.local
.scope_inner(&mut this.slot, || match future_opt.as_mut().as_pin_mut() {
Some(fut) => {
let res = fut.poll(cx);
if res.is_ready() {
future_opt.set(None);
}
Some(res)
let this = self.project();
let mut future_opt = this.future;

let res = this
.local
.scope_inner(this.slot, || match future_opt.as_mut().as_pin_mut() {
Some(fut) => {
let res = fut.poll(cx);
if res.is_ready() {
future_opt.set(None);
}
None => None,
});
Some(res)
}
None => None,
});

match res {
Ok(Some(res)) => res,
Expand All @@ -362,19 +378,6 @@ impl<T: 'static, F: Future> Future for TaskLocalFuture<T, F> {
}
}

impl<T: 'static, F> Drop for TaskLocalFuture<T, F> {
fn drop(&mut self) {
if mem::needs_drop::<F>() && self.future.is_some() {
// Drop the future while the task-local is set, if possible. Otherwise
// the future is dropped normally when the `Option<F>` field drops.
let future = &mut self.future;
let _ = self.local.scope_inner(&mut self.slot, || {
*future = None;
});
}
}
}

impl<T: 'static, F> fmt::Debug for TaskLocalFuture<T, F>
where
T: fmt::Debug,
Expand Down

0 comments on commit e63d0f1

Please sign in to comment.