Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: test more features with Miri #4397

Merged
merged 14 commits into from Feb 9, 2022
16 changes: 8 additions & 8 deletions .github/workflows/ci.yml
Expand Up @@ -167,18 +167,18 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
toolchain: ${{ env.nightly }}
components: miri
override: true
- uses: Swatinem/rust-cache@v1
- name: Install Miri
run: |
set -e
rustup component add miri
cargo miri setup
rm -rf tokio/tests

- name: miri
run: cargo miri test --features rt,rt-multi-thread,sync task
# Many of tests in tokio/tests and doctests use #[tokio::test] or
# #[tokio::main] that calls epoll_create1 that Miri does not support.
run: cargo miri test --features full --lib --no-fail-fast
working-directory: tokio
env:
MIRIFLAGS: -Zmiri-disable-isolation -Zmiri-tag-raw-pointers
PROPTEST_CASES: 10

san:
name: san
runs-on: ubuntu-latest
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/fs/file/tests.rs
Expand Up @@ -228,6 +228,7 @@ fn flush_while_idle() {
}

#[test]
#[cfg_attr(miri, ignore)] // takes a really long time with miri
fn read_with_buffer_larger_than_max() {
// Chunks
let chunk_a = 16 * 1024;
Expand Down Expand Up @@ -299,6 +300,7 @@ fn read_with_buffer_larger_than_max() {
}

#[test]
#[cfg_attr(miri, ignore)] // takes a really long time with miri
fn write_with_buffer_larger_than_max() {
// Chunks
let chunk_a = 16 * 1024;
Expand Down
1 change: 1 addition & 0 deletions tokio/src/process/unix/orphan.rs
Expand Up @@ -280,6 +280,7 @@ pub(crate) mod test {
drop(signal_guard);
}

#[cfg_attr(miri, ignore)] // Miri does not support epoll.
#[test]
fn does_not_register_signal_if_queue_empty() {
let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap();
Expand Down
7 changes: 6 additions & 1 deletion tokio/src/runtime/task/harness.rs
Expand Up @@ -26,6 +26,10 @@ where
}
}

fn header_ptr(&self) -> NonNull<Header> {
self.cell.cast()
}

fn header(&self) -> &Header {
unsafe { &self.cell.as_ref().header }
}
Expand Down Expand Up @@ -93,7 +97,8 @@ where

match self.header().state.transition_to_running() {
TransitionToRunning::Success => {
let waker_ref = waker_ref::<T, S>(self.header());
let header_ptr = self.header_ptr();
let waker_ref = waker_ref::<T, S>(&header_ptr);
let cx = Context::from_waker(&*waker_ref);
let res = poll_future(&self.core().stage, cx);

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -313,7 +313,7 @@ cfg_rt_multi_thread! {

impl<S: 'static> Task<S> {
fn into_raw(self) -> NonNull<Header> {
let ret = self.header().into();
let ret = self.raw.header_ptr();
mem::forget(self);
ret
}
Expand Down Expand Up @@ -427,7 +427,7 @@ unsafe impl<S> linked_list::Link for Task<S> {
type Target = Header;

fn as_raw(handle: &Task<S>) -> NonNull<Header> {
handle.header().into()
handle.raw.header_ptr()
}

unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/task/raw.rs
Expand Up @@ -57,6 +57,10 @@ impl RawTask {
RawTask { ptr }
}

pub(super) fn header_ptr(&self) -> NonNull<Header> {
self.ptr
}

/// Returns a reference to the task's meta structure.
///
/// Safe as `Header` is `Sync`.
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/runtime/task/waker.rs
Expand Up @@ -15,7 +15,7 @@ pub(super) struct WakerRef<'a, S: 'static> {

/// Returns a `WakerRef` which avoids having to pre-emptively increase the
/// refcount if there is no need to do so.
pub(super) fn waker_ref<T, S>(header: &Header) -> WakerRef<'_, S>
pub(super) fn waker_ref<T, S>(header: &NonNull<Header>) -> WakerRef<'_, S>
hawkw marked this conversation as resolved.
Show resolved Hide resolved
where
T: Future,
S: Schedule,
Expand All @@ -28,7 +28,7 @@ where
// point and not an *owned* waker, we must ensure that `drop` is never
// called on this waker instance. This is done by wrapping it with
// `ManuallyDrop` and then never calling drop.
let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::<T, S>(header))) };
let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::<T, S>(*header))) };

WakerRef {
waker,
Expand Down Expand Up @@ -77,7 +77,7 @@ where
let harness = Harness::<T, S>::from_raw(ptr);
trace!(harness, "waker.clone");
(*header).state.ref_inc();
raw_waker::<T, S>(header)
raw_waker::<T, S>(ptr)
}

unsafe fn drop_waker<T, S>(ptr: *const ())
Expand Down Expand Up @@ -114,12 +114,12 @@ where
harness.wake_by_ref();
}

fn raw_waker<T, S>(header: *const Header) -> RawWaker
fn raw_waker<T, S>(header: NonNull<Header>) -> RawWaker
where
T: Future,
S: Schedule,
{
let ptr = header as *const ();
let ptr = header.as_ptr() as *const ();
let vtable = &RawWakerVTable::new(
clone_waker::<T, S>,
wake_by_val::<T, S>,
Expand Down
20 changes: 14 additions & 6 deletions tokio/src/runtime/tests/queue.rs
Expand Up @@ -101,13 +101,21 @@ fn steal_batch() {
assert!(local1.pop().is_none());
}

const fn normal_or_miri(normal: usize, miri: usize) -> usize {
if cfg!(miri) {
miri
} else {
normal
}
}
hawkw marked this conversation as resolved.
Show resolved Hide resolved

#[test]
fn stress1() {
const NUM_ITER: usize = 1;
const NUM_STEAL: usize = 1_000;
const NUM_LOCAL: usize = 1_000;
const NUM_PUSH: usize = 500;
const NUM_POP: usize = 250;
const NUM_STEAL: usize = normal_or_miri(1_000, 10);
const NUM_LOCAL: usize = normal_or_miri(1_000, 10);
const NUM_PUSH: usize = normal_or_miri(500, 10);
const NUM_POP: usize = normal_or_miri(250, 10);

let mut metrics = MetricsBatch::new();

Expand Down Expand Up @@ -169,8 +177,8 @@ fn stress1() {
#[test]
fn stress2() {
const NUM_ITER: usize = 1;
const NUM_TASKS: usize = 1_000_000;
const NUM_STEAL: usize = 1_000;
const NUM_TASKS: usize = normal_or_miri(1_000_000, 50);
const NUM_STEAL: usize = normal_or_miri(1_000, 10);

let mut metrics = MetricsBatch::new();

Expand Down
4 changes: 3 additions & 1 deletion tokio/src/signal/registry.rs
Expand Up @@ -202,7 +202,9 @@ mod tests {
registry.broadcast();

// Yield so the previous broadcast can get received
crate::time::sleep(std::time::Duration::from_millis(10)).await;
for _ in 0..100 {
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
crate::task::yield_now().await;
}

// Send subsequent signal
registry.record_event(0);
Expand Down
1 change: 1 addition & 0 deletions tokio/src/signal/reusable_box.rs
Expand Up @@ -151,6 +151,7 @@ impl<T> fmt::Debug for ReusableBoxFuture<T> {
}

#[cfg(test)]
#[cfg(not(miri))] // Miri breaks when you use Pin<&mut dyn Future>
mod test {
use super::ReusableBoxFuture;
use futures::future::FutureExt;
Expand Down
5 changes: 3 additions & 2 deletions tokio/src/sync/notify.rs
Expand Up @@ -130,6 +130,7 @@ enum NotificationType {
}

#[derive(Debug)]
#[repr(C)] // required by `linked_list::Link` impl
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
struct Waiter {
/// Intrusive linked-list pointers.
pointers: linked_list::Pointers<Waiter>,
Expand Down Expand Up @@ -731,8 +732,8 @@ unsafe impl linked_list::Link for Waiter {
ptr
}

unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
NonNull::from(&mut target.as_mut().pointers)
unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
target.cast()
}
}

Expand Down
34 changes: 34 additions & 0 deletions tokio/src/sync/tests/notify.rs
Expand Up @@ -45,3 +45,37 @@ fn notify_clones_waker_before_lock() {
// The result doesn't matter, we're just testing that we don't deadlock.
let _ = future.poll(&mut cx);
}

#[test]
fn notify_simple() {
let notify = Notify::new();

let mut fut1 = tokio_test::task::spawn(notify.notified());
assert!(fut1.poll().is_pending());

let mut fut2 = tokio_test::task::spawn(notify.notified());
assert!(fut2.poll().is_pending());

notify.notify_waiters();

assert!(fut1.poll().is_ready());
assert!(fut2.poll().is_ready());
}

#[test]
#[cfg(not(target_arch = "wasm32"))]
fn watch_test() {
let rt = crate::runtime::Builder::new_current_thread()
.build()
.unwrap();

rt.block_on(async {
let (tx, mut rx) = crate::sync::watch::channel(());

crate::spawn(async move {
let _ = tx.send(());
});

let _ = rx.changed().await;
});
}
24 changes: 13 additions & 11 deletions tokio/src/time/driver/entry.rs
Expand Up @@ -326,15 +326,16 @@ pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, Ti
///
/// Note that this structure is located inside the `TimerEntry` structure.
#[derive(Debug)]
#[repr(C)] // required by `link_list::Link` impl
pub(crate) struct TimerShared {
/// Data manipulated by the driver thread itself, only.
driver_state: CachePadded<TimerSharedPadded>,

/// Current state. This records whether the timer entry is currently under
/// the ownership of the driver, and if not, its current state (not
/// complete, fired, error, etc).
state: StateCell,

/// Data manipulated by the driver thread itself, only.
driver_state: CachePadded<TimerSharedPadded>,

_p: PhantomPinned,
}

Expand Down Expand Up @@ -420,20 +421,21 @@ impl TimerShared {
/// padded. This contains the information that the driver thread accesses most
/// frequently to minimize contention. In particular, we move it away from the
/// waker, as the waker is updated on every poll.
#[repr(C)] // required by `link_list::Link` impl
struct TimerSharedPadded {
/// A link within the doubly-linked list of timers on a particular level and
/// slot. Valid only if state is equal to Registered.
///
/// Only accessed under the entry lock.
pointers: linked_list::Pointers<TimerShared>,
hawkw marked this conversation as resolved.
Show resolved Hide resolved

/// The expiration time for which this entry is currently registered.
/// Generally owned by the driver, but is accessed by the entry when not
/// registered.
cached_when: AtomicU64,

/// The true expiration time. Set by the timer future, read by the driver.
true_when: AtomicU64,

/// A link within the doubly-linked list of timers on a particular level and
/// slot. Valid only if state is equal to Registered.
///
/// Only accessed under the entry lock.
pointers: StdUnsafeCell<linked_list::Pointers<TimerShared>>,
}

impl std::fmt::Debug for TimerSharedPadded {
Expand All @@ -450,7 +452,7 @@ impl TimerSharedPadded {
Self {
cached_when: AtomicU64::new(0),
true_when: AtomicU64::new(0),
pointers: StdUnsafeCell::new(linked_list::Pointers::new()),
pointers: linked_list::Pointers::new(),
}
}
}
Expand All @@ -474,7 +476,7 @@ unsafe impl linked_list::Link for TimerShared {
unsafe fn pointers(
target: NonNull<Self::Target>,
) -> NonNull<linked_list::Pointers<Self::Target>> {
unsafe { NonNull::new(target.as_ref().driver_state.0.pointers.get()).unwrap() }
target.cast()
}
}

Expand Down
20 changes: 17 additions & 3 deletions tokio/src/time/driver/tests/mod.rs
Expand Up @@ -27,7 +27,12 @@ fn block_on<T>(f: impl std::future::Future<Output = T>) -> T {
return loom::future::block_on(f);

#[cfg(not(loom))]
return futures::executor::block_on(f);
{
let rt = crate::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(f)
}
}

fn model(f: impl Fn() + Send + Sync + 'static) {
Expand Down Expand Up @@ -182,6 +187,15 @@ fn reset_future() {
})
}

#[cfg(not(loom))]
fn normal_or_miri<T>(normal: T, miri: T) -> T {
if cfg!(miri) {
miri
} else {
normal
}
}

#[test]
#[cfg(not(loom))]
fn poll_process_levels() {
Expand All @@ -195,7 +209,7 @@ fn poll_process_levels() {

let mut entries = vec![];

for i in 0..1024 {
for i in 0..normal_or_miri(1024, 64) {
let mut entry = Box::pin(TimerEntry::new(
&handle,
clock.now() + Duration::from_millis(i),
Expand All @@ -208,7 +222,7 @@ fn poll_process_levels() {
entries.push(entry);
}

for t in 1..1024 {
for t in 1..normal_or_miri(1024, 64) {
handle.process_at_time(t as u64);
for (deadline, future) in entries.iter_mut().enumerate() {
let mut context = Context::from_waker(noop_waker_ref());
Expand Down
5 changes: 3 additions & 2 deletions tokio/src/util/linked_list.rs
Expand Up @@ -352,6 +352,7 @@ mod tests {
use std::pin::Pin;

#[derive(Debug)]
#[repr(C)]
struct Entry {
pointers: Pointers<Entry>,
val: i32,
Expand All @@ -369,8 +370,8 @@ mod tests {
Pin::new_unchecked(&*ptr.as_ptr())
}

unsafe fn pointers(mut target: NonNull<Entry>) -> NonNull<Pointers<Entry>> {
NonNull::from(&mut target.as_mut().pointers)
unsafe fn pointers(target: NonNull<Entry>) -> NonNull<Pointers<Entry>> {
target.cast()
}
}

Expand Down