New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rework binding of new tasks #3955
Changes from all commits
fea12c5
d74748d
935ee11
7597e85
48b6a80
dfbc538
75d5a00
17699f6
8f24c4e
2e39e4d
283298e
0bc8f20
deefadc
5fbdb24
7f27166
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,15 +75,13 @@ impl<T: 'static> Inject<T> { | |
|
||
/// Pushes a value into the queue. | ||
/// | ||
/// Returns `Err(task)` if pushing fails due to the queue being shutdown. | ||
/// The caller is expected to call `shutdown()` on the task **if and only | ||
/// if** it is a newly spawned task. | ||
pub(crate) fn push(&self, task: task::Notified<T>) -> Result<(), task::Notified<T>> { | ||
/// This does nothing if the queue is closed. | ||
pub(crate) fn push(&self, task: task::Notified<T>) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I know, but I would like you to confirm it :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's because the pushed task may be a new task, and if the push fails for a new task, the caller needs to know so it can call It is what I described with the following in the original PR text:
|
||
// Acquire queue lock | ||
let mut p = self.pointers.lock(); | ||
|
||
if p.is_closed { | ||
return Err(task); | ||
return; | ||
} | ||
|
||
// safety: only mutated with the lock held | ||
|
@@ -102,7 +100,6 @@ impl<T: 'static> Inject<T> { | |
p.tail = Some(task); | ||
|
||
self.len.store(len + 1, Release); | ||
Ok(()) | ||
} | ||
|
||
/// Pushes several values into the queue. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,33 +1,141 @@ | ||
//! This module has containers for storing the tasks spawned on a scheduler. The | ||
//! `OwnedTasks` container is thread-safe but can only store tasks that | ||
//! implement Send. The `LocalOwnedTasks` container is not thread safe, but can | ||
//! store non-Send tasks. | ||
//! | ||
//! The collections can be closed to prevent adding new tasks during shutdown of | ||
//! the scheduler with the collection. | ||
|
||
use crate::future::Future; | ||
use crate::loom::sync::Mutex; | ||
use crate::runtime::task::Task; | ||
use crate::runtime::task::{JoinHandle, Notified, Schedule, Task}; | ||
use crate::util::linked_list::{Link, LinkedList}; | ||
|
||
use std::marker::PhantomData; | ||
|
||
pub(crate) struct OwnedTasks<S: 'static> { | ||
list: Mutex<LinkedList<Task<S>, <Task<S> as Link>::Target>>, | ||
inner: Mutex<OwnedTasksInner<S>>, | ||
} | ||
struct OwnedTasksInner<S: 'static> { | ||
list: LinkedList<Task<S>, <Task<S> as Link>::Target>, | ||
closed: bool, | ||
} | ||
|
||
pub(crate) struct LocalOwnedTasks<S: 'static> { | ||
list: LinkedList<Task<S>, <Task<S> as Link>::Target>, | ||
closed: bool, | ||
_not_send: PhantomData<*const ()>, | ||
} | ||
|
||
impl<S: 'static> OwnedTasks<S> { | ||
pub(crate) fn new() -> Self { | ||
Self { | ||
list: Mutex::new(LinkedList::new()), | ||
inner: Mutex::new(OwnedTasksInner { | ||
list: LinkedList::new(), | ||
closed: false, | ||
}), | ||
} | ||
} | ||
|
||
pub(crate) fn push_front(&self, task: Task<S>) { | ||
self.list.lock().push_front(task); | ||
/// Bind the provided task to this OwnedTasks instance. This fails if the | ||
/// OwnedTasks has been closed. | ||
pub(crate) fn bind<T>( | ||
&self, | ||
task: T, | ||
scheduler: S, | ||
) -> (JoinHandle<T::Output>, Option<Notified<S>>) | ||
where | ||
S: Schedule, | ||
T: Future + Send + 'static, | ||
T::Output: Send + 'static, | ||
{ | ||
let (task, notified, join) = super::new_task(task, scheduler); | ||
|
||
let mut lock = self.inner.lock(); | ||
if lock.closed { | ||
drop(lock); | ||
drop(task); | ||
notified.shutdown(); | ||
(join, None) | ||
} else { | ||
lock.list.push_front(task); | ||
(join, Some(notified)) | ||
} | ||
} | ||
|
||
pub(crate) fn pop_back(&self) -> Option<Task<S>> { | ||
self.list.lock().pop_back() | ||
self.inner.lock().list.pop_back() | ||
} | ||
|
||
/// The caller must ensure that if the provided task is stored in a | ||
/// linked list, then it is in this linked list. | ||
pub(crate) unsafe fn remove(&self, task: &Task<S>) -> Option<Task<S>> { | ||
self.list.lock().remove(task.header().into()) | ||
self.inner.lock().list.remove(task.header().into()) | ||
} | ||
|
||
pub(crate) fn is_empty(&self) -> bool { | ||
self.inner.lock().list.is_empty() | ||
} | ||
|
||
#[cfg(feature = "rt-multi-thread")] | ||
pub(crate) fn is_closed(&self) -> bool { | ||
self.inner.lock().closed | ||
} | ||
|
||
/// Close the OwnedTasks. This prevents adding new tasks to the collection. | ||
pub(crate) fn close(&self) { | ||
self.inner.lock().closed = true; | ||
} | ||
} | ||
|
||
impl<S: 'static> LocalOwnedTasks<S> { | ||
pub(crate) fn new() -> Self { | ||
Self { | ||
list: LinkedList::new(), | ||
closed: false, | ||
_not_send: PhantomData, | ||
} | ||
} | ||
|
||
pub(crate) fn bind<T>( | ||
&mut self, | ||
task: T, | ||
scheduler: S, | ||
) -> (JoinHandle<T::Output>, Option<Notified<S>>) | ||
where | ||
S: Schedule, | ||
T: Future + 'static, | ||
T::Output: 'static, | ||
{ | ||
let (task, notified, join) = super::new_task(task, scheduler); | ||
|
||
if self.closed { | ||
drop(task); | ||
notified.shutdown(); | ||
(join, None) | ||
} else { | ||
self.list.push_front(task); | ||
(join, Some(notified)) | ||
} | ||
} | ||
|
||
pub(crate) fn pop_back(&mut self) -> Option<Task<S>> { | ||
self.list.pop_back() | ||
} | ||
|
||
/// The caller must ensure that if the provided task is stored in a | ||
/// linked list, then it is in this linked list. | ||
pub(crate) unsafe fn remove(&mut self, task: &Task<S>) -> Option<Task<S>> { | ||
self.list.remove(task.header().into()) | ||
} | ||
|
||
pub(crate) fn is_empty(&self) -> bool { | ||
self.list.lock().is_empty() | ||
self.list.is_empty() | ||
} | ||
|
||
/// Close the LocalOwnedTasks. This prevents adding new tasks to the | ||
/// collection. | ||
pub(crate) fn close(&mut self) { | ||
self.closed = true; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this ever
None
anymore?If it is never
None
, you don't need to removeOption
in this PR but we should clean it up in a follow up.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intentionally did not include that in this PR because the diff is already really long, and it wasn't that simple to remove the
None
case.