Skip to content

Commit

Permalink
Shared: fix false detection of inner panics (#2576)
Browse files Browse the repository at this point in the history
Fixes #2575.
  • Loading branch information
crepererum authored and taiki-e committed Aug 14, 2022
1 parent 8ace968 commit e075ff3
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 9 deletions.
22 changes: 13 additions & 9 deletions futures-util/src/future/future/shared.rs
Expand Up @@ -262,19 +262,20 @@ where
let waker = waker_ref(&inner.notifier);
let mut cx = Context::from_waker(&waker);

struct Reset<'a>(&'a AtomicUsize);
struct Reset<'a> {
state: &'a AtomicUsize,
did_not_panic: bool,
}

impl Drop for Reset<'_> {
fn drop(&mut self) {
use std::thread;

if thread::panicking() {
self.0.store(POISONED, SeqCst);
if !self.did_not_panic {
self.state.store(POISONED, SeqCst);
}
}
}

let _reset = Reset(&inner.notifier.state);
let mut reset = Reset { state: &inner.notifier.state, did_not_panic: false };

let output = {
let future = unsafe {
Expand All @@ -284,12 +285,15 @@ where
}
};

match future.poll(&mut cx) {
let poll_result = future.poll(&mut cx);
reset.did_not_panic = true;

match poll_result {
Poll::Pending => {
if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok()
{
// Success
drop(_reset);
drop(reset);
this.inner = Some(inner);
return Poll::Pending;
} else {
Expand All @@ -313,7 +317,7 @@ where
waker.wake();
}

drop(_reset); // Make borrow checker happy
drop(reset); // Make borrow checker happy
drop(wakers_guard);

// Safety: We're in the COMPLETE state
Expand Down
32 changes: 32 additions & 0 deletions futures/tests/future_shared.rs
Expand Up @@ -3,6 +3,7 @@ use futures::executor::{block_on, LocalPool};
use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt};
use futures::task::LocalSpawn;
use std::cell::{Cell, RefCell};
use std::panic::AssertUnwindSafe;
use std::rc::Rc;
use std::task::Poll;
use std::thread;
Expand Down Expand Up @@ -194,3 +195,34 @@ fn shared_future_that_wakes_itself_until_pending_is_returned() {
// has returned pending
assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ()));
}

#[test]
#[should_panic(expected = "inner future panicked during poll")]
fn panic_while_poll() {
let fut = futures::future::poll_fn::<i8, _>(|_cx| panic!("test")).shared();

let fut_captured = fut.clone();
std::panic::catch_unwind(AssertUnwindSafe(|| {
block_on(fut_captured);
}))
.unwrap_err();

block_on(fut);
}

#[test]
#[should_panic(expected = "test_marker")]
fn poll_while_panic() {
struct S;

impl Drop for S {
fn drop(&mut self) {
let fut = futures::future::ready(1).shared();
assert_eq!(block_on(fut.clone()), 1);
assert_eq!(block_on(fut), 1);
}
}

let _s = S {};
panic!("test_marker");
}

0 comments on commit e075ff3

Please sign in to comment.