Skip to content

Commit

Permalink
FlattenUnordered: check waker panic
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn committed Jan 9, 2021
1 parent 15f5866 commit 9bc4e4e
Showing 1 changed file with 47 additions and 1 deletion.
48 changes: 47 additions & 1 deletion futures/tests/stream.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::sync::Arc;

use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::{self, Future};
use futures::lock::Mutex;
use futures::sink::SinkExt;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
use futures::FutureExt;
use futures::{ready, FutureExt};
use futures_test::task::noop_context;

#[test]
Expand Down Expand Up @@ -243,6 +246,49 @@ fn flatten_unordered() {
assert_eq!(fm_unordered, fl_unordered);
assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});

// waker panics
{
let stream = Arc::new(Mutex::new(
Interchanger { polled: false, base: 0, wake_immediately: false }
.take(10)
.flat_map_unordered(10, |s| s.map(identity)),
));

struct PanicWaker;

impl ArcWake for PanicWaker {
fn wake_by_ref(_arc_self: &Arc<Self>) {
panic!("WAKE UP");
}
}

std::thread::spawn({
let stream = stream.clone();
move || {
let mut st = poll_fn(|cx| {
let mut lock = ready!(stream.lock().poll_unpin(cx));

let panic_waker = waker(Arc::new(PanicWaker));
let mut panic_cx = Context::from_waker(&panic_waker);
let data = ready!(lock.poll_next_unpin(&mut panic_cx));

Poll::Ready(data)
});

block_on(st.next())
}
})
.join()
.unwrap_err();

block_on(async move {
let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
values.sort();

assert_eq!(values, (0..60).collect::<Vec<u8>>());
});
}
}

#[cfg(feature = "executor")] // executor::
Expand Down

0 comments on commit 9bc4e4e

Please sign in to comment.