New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rework binding of new tasks #3955
Merged
Merged
Changes from 9 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
fea12c5
runtime: rework binding of new tasks
Darksonn d74748d
Fix runtime/tests
Darksonn 935ee11
Fix miri tests and rustfmt
Darksonn 7597e85
Fix InstrumentedFuture vs std::Future with tracing feature
Darksonn 48b6a80
Bind scheduler on into_notified
Darksonn dfbc538
Fix loom tests
Darksonn 75d5a00
rustfmt
Darksonn 17699f6
Fix imports
Darksonn 8f24c4e
Improve comments
Darksonn 2e39e4d
Remove UnboundTask
Darksonn 283298e
fix miri tests
Darksonn 0bc8f20
rustfmt
Darksonn deefadc
Address reviews
Darksonn 5fbdb24
Add cfg_rt_multi_thread on is_closed
Darksonn 7f27166
Merge branch 'master' into rework-task-binding
Darksonn File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,33 +1,127 @@ | ||
//! This module has containers for storing the tasks spawned on a scheduler. The | ||
//! `OwnedTasks` container is thread-safe but can only store tasks that | ||
//! implement Send. The `LocalOwnedTasks` container is not thread safe, but can | ||
//! store non-Send tasks. | ||
//! | ||
//! The collections can be closed to prevent adding new tasks during shutdown of | ||
//! the scheduler with the collection. | ||
|
||
use crate::future::Future; | ||
use crate::loom::sync::Mutex; | ||
use crate::runtime::task::Task; | ||
use crate::runtime::task::{Notified, Schedule, Task, UnboundTask}; | ||
use crate::util::linked_list::{Link, LinkedList}; | ||
|
||
use std::marker::PhantomData; | ||
|
||
pub(crate) struct OwnedTasks<S: 'static> { | ||
list: Mutex<LinkedList<Task<S>, <Task<S> as Link>::Target>>, | ||
inner: Mutex<OwnedTasksInner<S>>, | ||
} | ||
struct OwnedTasksInner<S: 'static> { | ||
list: LinkedList<Task<S>, <Task<S> as Link>::Target>, | ||
closed: bool, | ||
} | ||
|
||
pub(crate) struct LocalOwnedTasks<S: 'static> { | ||
list: LinkedList<Task<S>, <Task<S> as Link>::Target>, | ||
closed: bool, | ||
_not_send: PhantomData<*const ()>, | ||
} | ||
|
||
impl<S: 'static> OwnedTasks<S> { | ||
pub(crate) fn new() -> Self { | ||
Self { | ||
list: Mutex::new(LinkedList::new()), | ||
inner: Mutex::new(OwnedTasksInner { | ||
list: LinkedList::new(), | ||
closed: false, | ||
}), | ||
} | ||
} | ||
|
||
pub(crate) fn push_front(&self, task: Task<S>) { | ||
self.list.lock().push_front(task); | ||
/// Bind the provided task to this OwnedTasks instance. This fails if the | ||
/// OwnedTasks has been closed. | ||
pub(crate) fn bind<T>( | ||
&self, | ||
task: UnboundTask<T, S>, | ||
scheduler: S, | ||
) -> Result<Notified<S>, UnboundTask<T, S>> | ||
where | ||
S: Schedule, | ||
T: Future + Send + 'static, | ||
{ | ||
let mut lock = self.inner.lock(); | ||
if lock.closed { | ||
Err(task) | ||
} else { | ||
let (notified, task) = task.bind_and_split(scheduler); | ||
lock.list.push_front(task); | ||
Ok(notified) | ||
} | ||
} | ||
|
||
pub(crate) fn pop_back(&self) -> Option<Task<S>> { | ||
self.list.lock().pop_back() | ||
self.inner.lock().list.pop_back() | ||
} | ||
|
||
/// The caller must ensure that if the provided task is stored in a | ||
/// linked list, then it is in this linked list. | ||
pub(crate) unsafe fn remove(&self, task: &Task<S>) -> Option<Task<S>> { | ||
self.list.lock().remove(task.header().into()) | ||
self.inner.lock().list.remove(task.header().into()) | ||
} | ||
|
||
pub(crate) fn is_empty(&self) -> bool { | ||
self.inner.lock().list.is_empty() | ||
} | ||
|
||
/// Close the OwnedTasks. This prevents adding new tasks to the collection. | ||
pub(crate) fn close(&self) { | ||
self.inner.lock().closed = true; | ||
} | ||
} | ||
|
||
impl<S: 'static> LocalOwnedTasks<S> { | ||
pub(crate) fn new() -> Self { | ||
Self { | ||
list: LinkedList::new(), | ||
closed: false, | ||
_not_send: PhantomData, | ||
} | ||
} | ||
|
||
pub(crate) fn bind<T>( | ||
&mut self, | ||
task: UnboundTask<T, S>, | ||
scheduler: S, | ||
) -> Result<Notified<S>, UnboundTask<T, S>> | ||
where | ||
S: Schedule, | ||
T: Future + 'static, | ||
{ | ||
if self.closed { | ||
Err(task) | ||
} else { | ||
let (notified, task) = task.bind_and_split(scheduler); | ||
self.list.push_front(task); | ||
Ok(notified) | ||
} | ||
} | ||
|
||
pub(crate) fn pop_back(&mut self) -> Option<Task<S>> { | ||
self.list.pop_back() | ||
} | ||
|
||
/// The caller must ensure that if the provided task is stored in a | ||
/// linked list, then it is in this linked list. | ||
pub(crate) unsafe fn remove(&mut self, task: &Task<S>) -> Option<Task<S>> { | ||
self.list.remove(task.header().into()) | ||
} | ||
|
||
pub(crate) fn is_empty(&self) -> bool { | ||
self.list.lock().is_empty() | ||
self.list.is_empty() | ||
} | ||
|
||
/// Close the LocalOwnedTasks. This prevents adding new tasks to the | ||
/// collection. | ||
pub(crate) fn close(&mut self) { | ||
self.closed = true; | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the
Result
return not needed anymore?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I know, but I would like you to confirm it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because the pushed task may be a new task, and if the push fails for a new task, the caller needs to know so it can call
shutdown
on the task. With this PR, that's no longer necessary as binding the new task happens before pushing the notification, and if binding the task succeeded, then the runtime has been given responsibility for cleaning the new task up, even if submitting a notification for the task fails.It is what I described with the following in the original PR text: