Skip to content

Commit

Permalink
add one more test
Browse files Browse the repository at this point in the history
  • Loading branch information
agayev committed Nov 9, 2022
1 parent a6b34b9 commit 9897502
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 34 deletions.
14 changes: 9 additions & 5 deletions tokio/src/runtime/task/core.rs
Expand Up @@ -13,7 +13,7 @@ use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
use crate::runtime::task::{Id, Schedule};
use crate::runtime::task::{Id, Schedule, TaskIdGuard};
use crate::util::linked_list;

use std::pin::Pin;
Expand Down Expand Up @@ -107,7 +107,7 @@ generate_addr_of_methods! {
/// Either the future or the output.
pub(super) enum Stage<T: Future> {
Running(T),
Finished(super::Result<T::Output>),
Finished(super::Result<T::Output>, Id),
Consumed,
}

Expand Down Expand Up @@ -200,6 +200,10 @@ impl<T: Future> CoreStage<T> {
pub(super) fn drop_future_or_output(&self) {
// Safety: the caller ensures mutual exclusion to the field.
unsafe {
let _task_id_guard = self.stage.with_mut(|ptr| match &*ptr {
Stage::Finished(Ok(_), id) => Some(TaskIdGuard::new(*id)),
_ => None,
});
self.set_stage(Stage::Consumed);
}
}
Expand All @@ -209,10 +213,10 @@ impl<T: Future> CoreStage<T> {
/// # Safety
///
/// The caller must ensure it is safe to mutate the `stage` field.
pub(super) fn store_output(&self, output: super::Result<T::Output>) {
pub(super) fn store_output(&self, output: super::Result<T::Output>, id: Id) {
// Safety: the caller ensures mutual exclusion to the field.
unsafe {
self.set_stage(Stage::Finished(output));
self.set_stage(Stage::Finished(output, id));
}
}

Expand All @@ -227,7 +231,7 @@ impl<T: Future> CoreStage<T> {
self.stage.with_mut(|ptr| {
// Safety:: the caller ensures mutual exclusion to the field.
match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) {
Stage::Finished(output) => output,
Stage::Finished(output, _) => output,
_ => panic!("JoinHandle polled after completion"),
}
})
Expand Down
25 changes: 5 additions & 20 deletions tokio/src/runtime/task/harness.rs
@@ -1,9 +1,8 @@
use crate::future::Future;
use crate::runtime::context;
use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Trailer};
use crate::runtime::task::state::{Snapshot, State};
use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{Id, JoinError, Notified, Schedule, Task};
use crate::runtime::task::{JoinError, Notified, Schedule, Task, TaskIdGuard};

use std::mem;
use std::mem::ManuallyDrop;
Expand Down Expand Up @@ -457,23 +456,9 @@ enum PollFuture {
Dealloc,
}

/// Guard that sets and clears the task id in the context during task execution
/// and cancellation.
struct TaskIdGuard {}
impl TaskIdGuard {
fn new(id: Id) -> Self {
context::set_current_task_id(Some(id));
TaskIdGuard {}
}
}
impl Drop for TaskIdGuard {
fn drop(&mut self) {
context::set_current_task_id(None);
}
}

/// Cancels the task and store the appropriate error in the stage field.
fn cancel_task<T: Future>(stage: &CoreStage<T>, id: super::Id) {
println!("cancel_task called");
// Drop the future from a panic guard.
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
let _task_id_guard = TaskIdGuard::new(id);
Expand All @@ -482,10 +467,10 @@ fn cancel_task<T: Future>(stage: &CoreStage<T>, id: super::Id) {

match res {
Ok(()) => {
stage.store_output(Err(JoinError::cancelled(id)));
stage.store_output(Err(JoinError::cancelled(id)), id);
}
Err(panic) => {
stage.store_output(Err(JoinError::panic(id, panic)));
stage.store_output(Err(JoinError::panic(id, panic)), id);
}
}
}
Expand Down Expand Up @@ -529,7 +514,7 @@ fn poll_future<T: Future, S: Schedule>(

// Catch and ignore panics if the future panics on drop.
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
core.store_output(output);
core.store_output(output, id);
}));

if res.is_err() {
Expand Down
17 changes: 17 additions & 0 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -204,6 +204,8 @@ use std::{fmt, mem};
#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)]
pub struct Id(u64);

use crate::runtime::context;

/// Returns the `Id` of the task.
///
/// # Panics
Expand All @@ -218,6 +220,21 @@ pub fn id() -> Id {
context::current_task_id()
}

/// Guard that sets and clears the task id in the context during task execution
/// and cancellation.
pub(crate) struct TaskIdGuard {}
impl TaskIdGuard {
fn new(id: Id) -> Self {
context::set_current_task_id(Some(id));
TaskIdGuard {}
}
}
impl Drop for TaskIdGuard {
fn drop(&mut self) {
context::set_current_task_id(None);
}
}

/// An owned handle to the task, tracked by ref count.
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
Expand Down
24 changes: 15 additions & 9 deletions tokio/tests/task_local.rs
Expand Up @@ -190,7 +190,7 @@ async fn task_ids_match_multi_thread() {

#[cfg(tokio_unstable)]
#[tokio::test(flavor = "multi_thread")]
async fn task_id_future_destructor_normal_completion() {
async fn task_id_future_destructor_completion() {
use tokio::task;

struct MyFuture {}
Expand Down Expand Up @@ -233,30 +233,36 @@ async fn task_id_future_destructor_abort() {
}

#[cfg(tokio_unstable)]
#[tokio::test(flavor = "multi_thread")]
async fn task_id_output_destructor() {
#[tokio::test(flavor = "current_thread")]
async fn task_id_output_destructor_handle_dropped_after_completion() {
use tokio::task;

struct MyOutput {}
impl Drop for MyOutput {
fn drop(&mut self) {
println!("task id: {}", task::id());
println!("output task id: {}", task::id());
}
}

struct MyFuture {}
struct MyFuture {
tx: Option<oneshot::Sender<()>>,
}
impl Future for MyFuture {
type Output = MyOutput;

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let _ = self.tx.take().unwrap().send(());
Poll::Ready(MyOutput {})
}
}
impl Drop for MyFuture {
fn drop(&mut self) {
println!("task id: {}", task::id());
println!("future task id: {}", task::id());
}
}

drop(tokio::spawn(MyFuture {}));
let (tx, rx) = oneshot::channel();
let handle = tokio::spawn(MyFuture { tx: Some(tx) });
rx.await.unwrap();
drop(handle);
}

0 comments on commit 9897502

Please sign in to comment.