New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rework binding of new tasks #3955
Changes from 12 commits
fea12c5
d74748d
935ee11
7597e85
48b6a80
dfbc538
75d5a00
17699f6
8f24c4e
2e39e4d
283298e
0bc8f20
deefadc
5fbdb24
7f27166
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,7 +93,7 @@ pub(super) enum Stage<T: Future> { | |
impl<T: Future, S: Schedule> Cell<T, S> { | ||
/// Allocates a new task cell, containing the header, trailer, and core | ||
/// structures. | ||
pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> { | ||
pub(super) fn new(future: T, scheduler: S, state: State) -> Box<Cell<T, S>> { | ||
#[cfg(all(tokio_unstable, feature = "tracing"))] | ||
let id = future.id(); | ||
Box::new(Cell { | ||
|
@@ -107,7 +107,7 @@ impl<T: Future, S: Schedule> Cell<T, S> { | |
}, | ||
core: Core { | ||
scheduler: Scheduler { | ||
scheduler: UnsafeCell::new(None), | ||
scheduler: UnsafeCell::new(Some(scheduler)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this ever If it is never There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I intentionally did not include that in this PR because the diff is already really long, and it wasn't that simple to remove the |
||
}, | ||
stage: CoreStage { | ||
stage: UnsafeCell::new(Stage::Running(future)), | ||
|
@@ -125,34 +125,6 @@ impl<S: Schedule> Scheduler<S> { | |
self.scheduler.with_mut(f) | ||
} | ||
|
||
/// Bind a scheduler to the task. | ||
/// | ||
/// This only happens on the first poll and must be preceded by a call to | ||
/// `is_bound` to determine if binding is appropriate or not. | ||
/// | ||
/// # Safety | ||
/// | ||
/// Binding must not be done concurrently since it will mutate the task | ||
/// core through a shared reference. | ||
pub(super) fn bind_scheduler(&self, task: Task<S>) { | ||
// This function may be called concurrently, but the __first__ time it | ||
// is called, the caller has unique access to this field. All subsequent | ||
// concurrent calls will be via the `Waker`, which will "happens after" | ||
// the first poll. | ||
// | ||
// In other words, it is always safe to read the field and it is safe to | ||
// write to the field when it is `None`. | ||
debug_assert!(!self.is_bound()); | ||
|
||
// Bind the task to the scheduler | ||
let scheduler = S::bind(task); | ||
|
||
// Safety: As `scheduler` is not set, this is the first poll | ||
self.scheduler.with_mut(|ptr| unsafe { | ||
*ptr = Some(scheduler); | ||
}); | ||
} | ||
|
||
/// Returns true if the task is bound to a scheduler. | ||
pub(super) fn is_bound(&self) -> bool { | ||
// Safety: never called concurrently w/ a mutation. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,15 +75,13 @@ impl<T: 'static> Inject<T> { | |
|
||
/// Pushes a value into the queue. | ||
/// | ||
/// Returns `Err(task)` if pushing fails due to the queue being shutdown. | ||
/// The caller is expected to call `shutdown()` on the task **if and only | ||
/// if** it is a newly spawned task. | ||
pub(crate) fn push(&self, task: task::Notified<T>) -> Result<(), task::Notified<T>> { | ||
/// This does nothing if the queue is closed. | ||
pub(crate) fn push(&self, task: task::Notified<T>) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I know, but I would like you to confirm it :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's because the pushed task may be a new task, and if the push fails for a new task, the caller needs to know so it can call It is what I described with the following in the original PR text:
|
||
// Acquire queue lock | ||
let mut p = self.pointers.lock(); | ||
|
||
if p.is_closed { | ||
return Err(task); | ||
return; | ||
} | ||
|
||
// safety: only mutated with the lock held | ||
|
@@ -102,7 +100,6 @@ impl<T: 'static> Inject<T> { | |
p.tail = Some(task); | ||
|
||
self.len.store(len + 1, Release); | ||
Ok(()) | ||
} | ||
|
||
/// Pushes several values into the queue. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this is the only place
task::joinable(...)
is used still. Is that because there is no set of owned tasks here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's only used for blocking tasks and tests.