New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rework binding of new tasks #3955
Conversation
This is marked as a draft because it seems like I broke |
The reason |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The review below mostly contains comments that are intended to be helpful for people reviewing this PR.
/// the `JoinHandle`. As the task starts with a `JoinHandle`, `JOIN_INTEREST` is | ||
/// set. A new task is immediately pushed into the run queue for execution and | ||
/// starts with the `NOTIFIED` flag set. | ||
const INITIAL_STATE: usize = (REF_ONE * 2) | JOIN_INTEREST | NOTIFIED; | ||
const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this change, the third ref-count is incremented when lazily binding the executor. Since we eagerly bind the executor now, that ref-count is here from the beginning.
pub(super) fn transition_to_running(&self, ref_inc: bool) -> UpdateResult { | ||
pub(super) fn transition_to_running(&self) -> UpdateResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ref-count parameter was used only when lazily binding the scheduler.
@@ -1,21 +1,27 @@ | |||
#[cfg(not(all(tokio_unstable, feature = "tracing")))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in this file are to construct a method like tokio::runtime::task::joinable
, except that it immediately binds the task to a NoopScheduler
and returns a Notified
rather than an UnboundTask
. The method is used in tests of the inject queue only.
@@ -1,43 +1,209 @@ | |||
use crate::runtime::task::{self, OwnedTasks, Schedule, Task}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file has miri tests. Miri will catch memory leaks.
tokio/src/runtime/thread_pool/mod.rs
Outdated
// is shutting down. The task must be explicitly shutdown at this point. | ||
task.shutdown(); | ||
} | ||
worker::Shared::bind_new_task(&self.shared, task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The self: &Arc<Self>
calling convention is not stable on MSRV, so use free-standing function instead.
@@ -211,7 +211,7 @@ impl Read for &'_ File { | |||
assert!(dst.len() >= data.len()); | |||
assert!(dst.len() <= 16 * 1024, "actual = {}", dst.len()); // max buffer | |||
|
|||
&mut dst[..data.len()].copy_from_slice(&data); | |||
dst[..data.len()].copy_from_slice(&data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just fixes a warning from the nightly compiler.
// Close the OwnedTasks to prevent spawning new tasks during shutdown. | ||
worker.shared.owned.close(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do this when closing the inject queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably? I would guess we should close at the same point.
Also, having two "closed" states to update makes me uncomfortable. It seems like a potential source of bugs. Is there a way to have a single closed flag (either the owned set or the inject queue)?
What would happen if we only had shared.owned
track closed instead of the inject queue? Then, when closing, there are no more tasks that can be spanwed, all existing tasks are "shutdown", then the queues drained? The inject queue should no longer be used I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see the comment in deefadc
for why we need both close bits.
tokio/src/runtime/task/mod.rs
Outdated
// This method is used by the blocking spawner. | ||
pub(crate) fn into_notified(self, scheduler: S) -> Notified<S> | ||
where | ||
T: Send, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An unfortunate consequence of this is that we break the fast path in the destructor of join handles for blocking tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you point to the fast path you are referencing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tokio/src/runtime/task/mod.rs
Outdated
@@ -50,20 +50,68 @@ unsafe impl<S> Sync for Task<S> {} | |||
#[repr(transparent)] | |||
pub(crate) struct Notified<S: 'static>(Task<S>); | |||
|
|||
/// A task not yet bound to an executor. This object holds two ref-counts to | |||
/// the task to enable it to be split into two for free. | |||
pub(crate) struct UnboundTask<T, S: 'static> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you remind me why we have UnboundTask
vs. immediately bind in spawn?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the more direct change to do it like this, but I will think about whether I can remove it without changing too much code.
/// if** it is a newly spawned task. | ||
pub(crate) fn push(&self, task: task::Notified<T>) -> Result<(), task::Notified<T>> { | ||
/// This does nothing if the queue is closed. | ||
pub(crate) fn push(&self, task: task::Notified<T>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the Result
return not needed anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I know, but I would like you to confirm it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because the pushed task may be a new task, and if the push fails for a new task, the caller needs to know so it can call shutdown
on the task. With this PR, that's no longer necessary as binding the new task happens before pushing the notification, and if binding the task succeeded, then the runtime has been given responsibility for cleaning the new task up, even if submitting a notification for the task fails.
It is what I described with the following in the original PR text:
The PR also introduces a conceptual change for the meaning of the
Notified
type — that type is now "just" a notification, and someone who holds aNotified
is never responsible for cleaning up the task.
@@ -107,7 +107,7 @@ impl<T: Future, S: Schedule> Cell<T, S> { | |||
}, | |||
core: Core { | |||
scheduler: Scheduler { | |||
scheduler: UnsafeCell::new(None), | |||
scheduler: UnsafeCell::new(Some(scheduler)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this ever None
anymore?
If it is never None
, you don't need to remove Option
in this PR but we should clean it up in a follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intentionally did not include that in this PR because the diff is already really long, and it wasn't that simple to remove the None
case.
I will probably need to keep reviewing tomorrow. Could you write a bit about why |
The only difference between The PR currently does not enforce that local tasks are polled on the right thread, since polling happens through |
tokio/src/runtime/handle.rs
Outdated
@@ -213,7 +213,7 @@ impl Handle { | |||
#[cfg(not(all(tokio_unstable, feature = "tracing")))] | |||
let _ = name; | |||
|
|||
let (task, handle) = task::joinable(fut); | |||
let (task, handle) = task::joinable(fut, NoopSchedule); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this is the only place task::joinable(...)
is used still. Is that because there is no set of owned tasks here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's only used for blocking tasks and tests.
tokio/src/runtime/task/mod.rs
Outdated
S: Schedule, | ||
{ | ||
let raw = RawTask::new::<_, S>(task); | ||
let raw = RawTask::new::<_, S>(task, scheduler); | ||
raw.header().state.ref_dec(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a comment explaining what this ref_dec()
is for. My guess is it is because there is no task set owning this task?
I would probably rename the joinable
function to something like unowned(...)
or something like that to make it more obvious what the fn is for now.
tokio/src/runtime/task/state.rs
Outdated
@@ -54,11 +54,11 @@ const REF_ONE: usize = 1 << REF_COUNT_SHIFT; | |||
|
|||
/// State a task is initialized with | |||
/// | |||
/// A task is initialized with two references: one for the scheduler and one for | |||
/// A task is initialized with three references: two for the scheduler and one for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably be more explicit here. "One for OwnedTasks
(held by the scheduler), one for Notified<_>
(used to poll the task), and one for JoinHandle
.
or something.
tokio/src/runtime/task/list.rs
Outdated
T: Future + Send + 'static, | ||
T::Output: Send + 'static, | ||
{ | ||
let raw = RawTask::new::<T, S>(task, scheduler); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there is a bunch of duplication between the two bind
fns (and task::joinable
). If possible, it would be nice to unify some.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! A nice step in the right direction. I couldn't find anything major. I left notes inline. I know there are follow-up PRs planned to further improve things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
This PR reworks how new tasks are bound to the runtime they are spawned on. The gist of the PR is that the task is now bound to the runtime immediately as part of spawning the task, instead of being bound lazily on first poll. The PR also introduces a conceptual change for the meaning of the
Notified
type — that type is now "just" a notification, and someone who holds aNotified
is never responsible for cleaning up the task.To implement this, I make the following changes:
bind
method from theSchedule
trait.schedule
methods to just consume and drop theNotified
when the runtime is shutting down, instead of reporting an error so the task can be shut down immediately when it is newly spawned.UnboundTask
type, which is the type used by a task until it is bound to a scheduler.bind
method onOwnedTasks
that takes anUnboundTask
and returns aNotified
for that task.OwnedTasks
closeable to prevent binding new tasks to it when the runtime is shutting down.