Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: use pin-project for TaskLocalFuture #5758

Merged
merged 4 commits into from
Jun 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion tokio-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ signal = ["tokio/signal"]

[dependencies]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
pin-project-lite = "0.2.7"
tokio = { version = "1.15.0", path = "../tokio", features = ["sync"] }
tokio-util = { version = "0.7.0", path = "../tokio-util", optional = true }

Expand Down
2 changes: 1 addition & 1 deletion tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ futures-core = "0.3.0"
futures-sink = "0.3.0"
futures-io = { version = "0.3.0", optional = true }
futures-util = { version = "0.3.0", optional = true }
pin-project-lite = "0.2.0"
pin-project-lite = "0.2.7"
slab = { version = "0.4.4", optional = true } # Backs `DelayQueue`
tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true }

Expand Down
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ autocfg = "1.1"
[dependencies]
tokio-macros = { version = "~2.1.0", path = "../tokio-macros", optional = true }

pin-project-lite = "0.2.0"
pin-project-lite = "0.2.7"

# Everything else is optional...
bytes = { version = "1.0.0", optional = true }
Expand Down
121 changes: 62 additions & 59 deletions tokio/src/task/task_local.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use pin_project_lite::pin_project;
use std::cell::RefCell;
use std::error::Error;
use std::future::Future;
Expand Down Expand Up @@ -299,60 +300,75 @@ impl<T: 'static> fmt::Debug for LocalKey<T> {
}
}

/// A future that sets a value `T` of a task local for the future `F` during
/// its execution.
///
/// The value of the task-local must be `'static` and will be dropped on the
/// completion of the future.
///
/// Created by the function [`LocalKey::scope`](self::LocalKey::scope).
///
/// ### Examples
///
/// ```
/// # async fn dox() {
/// tokio::task_local! {
/// static NUMBER: u32;
/// }
///
/// NUMBER.scope(1, async move {
/// println!("task local value: {}", NUMBER.get());
/// }).await;
/// # }
/// ```
// Doesn't use pin_project due to custom Drop.
pub struct TaskLocalFuture<T, F>
where
T: 'static,
{
local: &'static LocalKey<T>,
slot: Option<T>,
future: Option<F>,
_pinned: PhantomPinned,
pin_project! {
/// A future that sets a value `T` of a task local for the future `F` during
/// its execution.
///
/// The value of the task-local must be `'static` and will be dropped on the
/// completion of the future.
///
/// Created by the function [`LocalKey::scope`](self::LocalKey::scope).
///
/// ### Examples
///
/// ```
/// # async fn dox() {
/// tokio::task_local! {
/// static NUMBER: u32;
/// }
///
/// NUMBER.scope(1, async move {
/// println!("task local value: {}", NUMBER.get());
/// }).await;
/// # }
/// ```
pub struct TaskLocalFuture<T, F>
where
T: 'static,
{
local: &'static LocalKey<T>,
slot: Option<T>,
#[pin]
future: Option<F>,
#[pin]
_pinned: PhantomPinned,
Comment on lines +333 to +334
Copy link
Contributor Author

@BugenZhao BugenZhao Jun 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really clear about whether we need this Future to be !Unpin, as making it Unpin seems to make life easier in some situations.

For example, if the feature in #5743 is added, it'll be common that we poll this with &mut fut to avoid the future being consumed, then we're able to take the value back after getting Ready. If the future is always !Unpin, we are not able to take back the ownership of the future. :(

Currently, I leave it untouched according to #3943 (comment). IIUC, this is considered to be a breaking change in the compatibility?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the PhantomPinned field would not be breaking, but we could never undo it because that would be breaking.

As for into_value, you make a good point. That method would not be able to take ownership of the future.

}

impl<T: 'static, F> PinnedDrop for TaskLocalFuture<T, F> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
if mem::needs_drop::<F>() && this.future.is_some() {
// Drop the future while the task-local is set, if possible. Otherwise
// the future is dropped normally when the `Option<F>` field drops.
let mut future = this.future;
let _ = this.local.scope_inner(this.slot, || {
future.set(None);
});
}
}
}
}

impl<T: 'static, F: Future> Future for TaskLocalFuture<T, F> {
type Output = F::Output;

#[track_caller]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// safety: The TaskLocalFuture struct is `!Unpin` so there is no way to
// move `self.future` from now on.
let this = unsafe { Pin::into_inner_unchecked(self) };
let mut future_opt = unsafe { Pin::new_unchecked(&mut this.future) };

let res =
this.local
.scope_inner(&mut this.slot, || match future_opt.as_mut().as_pin_mut() {
Some(fut) => {
let res = fut.poll(cx);
if res.is_ready() {
future_opt.set(None);
}
Some(res)
let this = self.project();
let mut future_opt = this.future;

let res = this
.local
.scope_inner(this.slot, || match future_opt.as_mut().as_pin_mut() {
Some(fut) => {
let res = fut.poll(cx);
if res.is_ready() {
future_opt.set(None);
}
None => None,
});
Some(res)
}
None => None,
});

match res {
Ok(Some(res)) => res,
Expand All @@ -362,19 +378,6 @@ impl<T: 'static, F: Future> Future for TaskLocalFuture<T, F> {
}
}

impl<T: 'static, F> Drop for TaskLocalFuture<T, F> {
fn drop(&mut self) {
if mem::needs_drop::<F>() && self.future.is_some() {
// Drop the future while the task-local is set, if possible. Otherwise
// the future is dropped normally when the `Option<F>` field drops.
let future = &mut self.future;
let _ = self.local.scope_inner(&mut self.slot, || {
*future = None;
});
}
}
}

impl<T: 'static, F> fmt::Debug for TaskLocalFuture<T, F>
where
T: fmt::Debug,
Expand Down