Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into io-driver-intrusive
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Sep 22, 2020
2 parents f64928b + 7ae5b7b commit 2ba0c30
Show file tree
Hide file tree
Showing 52 changed files with 1,166 additions and 741 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Expand Up @@ -9,7 +9,7 @@ name: CI
env:
RUSTFLAGS: -Dwarnings
RUST_BACKTRACE: 1
nightly: nightly-2020-07-12
nightly: nightly-2020-09-21
minrust: 1.45.2

jobs:
Expand Down Expand Up @@ -91,7 +91,7 @@ jobs:
run: cargo test --features full
working-directory: tokio
env:
RUSTFLAGS: '--cfg tokio_unstable'
RUSTFLAGS: --cfg tokio_unstable -Dwarnings

miri:
name: miri
Expand All @@ -110,7 +110,7 @@ jobs:
rm -rf tokio/tests
- name: miri
run: cargo miri test --features rt-core,rt-threaded,rt-util,sync -- -- task
run: cargo miri test --features rt-core,rt-threaded,rt-util,sync task
working-directory: tokio

cross:
Expand Down Expand Up @@ -156,7 +156,7 @@ jobs:
- name: check --each-feature --unstable
run: cargo hack check --all --each-feature --skip io-driver,io-readiness -Z avoid-dev-deps
env:
RUSTFLAGS: --cfg tokio_unstable
RUSTFLAGS: --cfg tokio_unstable -Dwarnings

minrust:
name: minrust
Expand Down Expand Up @@ -239,6 +239,6 @@ jobs:
run: cargo test --lib --release --features full -- --nocapture $SCOPE
working-directory: tokio
env:
RUSTFLAGS: --cfg loom --cfg tokio_unstable
RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings
LOOM_MAX_PREEMPTIONS: 2
SCOPE: ${{ matrix.scope }}
8 changes: 8 additions & 0 deletions benches/Cargo.toml
Expand Up @@ -8,6 +8,9 @@ edition = "2018"
tokio = { version = "0.3.0", path = "../tokio", features = ["full"] }
bencher = "0.1.5"

[target.'cfg(unix)'.dependencies]
libc = "0.2.42"

[[bench]]
name = "spawn"
path = "spawn.rs"
Expand All @@ -33,3 +36,8 @@ harness = false
name = "sync_semaphore"
path = "sync_semaphore.rs"
harness = false

[[bench]]
name = "signal"
path = "signal.rs"
harness = false
96 changes: 96 additions & 0 deletions benches/signal.rs
@@ -0,0 +1,96 @@
//! Benchmark the delay in propagating OS signals to any listeners.
#![cfg(unix)]

use bencher::{benchmark_group, benchmark_main, Bencher};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::runtime;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;

struct Spinner {
count: usize,
}

impl Future for Spinner {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count > 3 {
Poll::Ready(())
} else {
self.count += 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

impl Spinner {
fn new() -> Self {
Self { count: 0 }
}
}

pub fn send_signal(signal: libc::c_int) {
use libc::{getpid, kill};

unsafe {
assert_eq!(kill(getpid(), signal), 0);
}
}

fn many_signals(bench: &mut Bencher) {
let num_signals = 10;
let (tx, mut rx) = mpsc::channel(num_signals);

let rt = runtime::Builder::new()
// Intentionally single threaded to measure delays in propagating wakes
.basic_scheduler()
.enable_all()
.build()
.unwrap();

let spawn_signal = |kind| {
let mut tx = tx.clone();
rt.spawn(async move {
let mut signal = signal(kind).expect("failed to create signal");

while signal.recv().await.is_some() {
if tx.send(()).await.is_err() {
break;
}
}
});
};

for _ in 0..num_signals {
// Pick some random signals which don't terminate the test harness
spawn_signal(SignalKind::child());
spawn_signal(SignalKind::io());
}
drop(tx);

// Turn the runtime for a while to ensure that all the spawned
// tasks have been polled at least once
rt.block_on(Spinner::new());

bench.iter(|| {
rt.block_on(async {
send_signal(libc::SIGCHLD);
for _ in 0..num_signals {
rx.recv().await.expect("channel closed");
}

send_signal(libc::SIGIO);
for _ in 0..num_signals {
rx.recv().await.expect("channel closed");
}
});
});
}

benchmark_group!(signal_group, many_signals,);

benchmark_main!(signal_group);
2 changes: 1 addition & 1 deletion tokio-util/src/lib.rs
Expand Up @@ -6,7 +6,7 @@
rust_2018_idioms,
unreachable_pub
)]
#![deny(intra_doc_link_resolution_failure)]
#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
Expand Down
19 changes: 3 additions & 16 deletions tokio-util/tests/framed_read.rs
Expand Up @@ -185,8 +185,8 @@ fn read_partial_would_block_then_err() {
#[test]
fn huge_size() {
let mut task = task::spawn(());
let data = [0; 32 * 1024];
let mut framed = FramedRead::new(Slice(&data[..]), BigDecoder);
let data = &[0; 32 * 1024][..];
let mut framed = FramedRead::new(data, BigDecoder);

task.enter(|cx, _| {
assert_read!(pin!(framed).poll_next(cx), 0);
Expand All @@ -212,7 +212,7 @@ fn huge_size() {
#[test]
fn data_remaining_is_error() {
let mut task = task::spawn(());
let slice = Slice(&[0; 5]);
let slice = &[0; 5][..];
let mut framed = FramedRead::new(slice, U32Decoder);

task.enter(|cx, _| {
Expand Down Expand Up @@ -280,16 +280,3 @@ impl AsyncRead for Mock {
}
}
}

// TODO this newtype is necessary because `&[u8]` does not currently implement `AsyncRead`
struct Slice<'a>(&'a [u8]);

impl AsyncRead for Slice<'_> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}
44 changes: 0 additions & 44 deletions tokio/src/future/pending.rs

This file was deleted.

13 changes: 12 additions & 1 deletion tokio/src/io/registration.rs
Expand Up @@ -87,7 +87,18 @@ impl Registration {
where
T: Evented,
{
let handle = Handle::current();
Self::new_with_ready_and_handle(io, ready, Handle::current())
}

/// Same as `new_with_ready` but also accepts an explicit handle.
pub(crate) fn new_with_ready_and_handle<T>(
io: &T,
ready: mio::Ready,
handle: Handle,
) -> io::Result<Registration>
where
T: Evented,
{
let shared = if let Some(inner) = handle.inner() {
inner.add_source(io, ready)?
} else {
Expand Down
14 changes: 8 additions & 6 deletions tokio/src/io/seek.rs
Expand Up @@ -4,12 +4,14 @@ use std::io::{self, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};

/// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Seek<'a, S: ?Sized> {
seek: &'a mut S,
pos: Option<SeekFrom>,
cfg_io_util! {
/// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Seek<'a, S: ?Sized> {
seek: &'a mut S,
pos: Option<SeekFrom>,
}
}

pub(crate) fn seek<S>(seek: &mut S, pos: SeekFrom) -> Seek<'_, S>
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/util/async_read_ext.rs
Expand Up @@ -1067,7 +1067,7 @@ cfg_io_util! {
/// (See also the [`crate::fs::read_to_string`] convenience function for
/// reading from a file.)
///
/// [`crate::fs::read_to_string`]: crate::fs::read_to_string::read_to_string
/// [`crate::fs::read_to_string`]: fn@crate::fs::read_to_string
fn read_to_string<'a>(&'a mut self, dst: &'a mut String) -> ReadToString<'a, Self>
where
Self: Unpin,
Expand Down

0 comments on commit 2ba0c30

Please sign in to comment.