Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signal refactor #2835

Merged
merged 9 commits into from Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions benches/Cargo.toml
Expand Up @@ -8,6 +8,9 @@ edition = "2018"
tokio = { version = "0.3.0", path = "../tokio", features = ["full"] }
bencher = "0.1.5"

[target.'cfg(unix)'.dependencies]
libc = "0.2.42"

[[bench]]
name = "spawn"
path = "spawn.rs"
Expand All @@ -33,3 +36,8 @@ harness = false
name = "sync_semaphore"
path = "sync_semaphore.rs"
harness = false

[[bench]]
name = "signal"
path = "signal.rs"
harness = false
96 changes: 96 additions & 0 deletions benches/signal.rs
@@ -0,0 +1,96 @@
//! Benchmark the delay in propagating OS signals to any listeners.
#![cfg(unix)]

use bencher::{benchmark_group, benchmark_main, Bencher};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::runtime;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;

struct Spinner {
count: usize,
}

impl Future for Spinner {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count > 3 {
Poll::Ready(())
} else {
self.count += 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

impl Spinner {
fn new() -> Self {
Self { count: 0 }
}
}

pub fn send_signal(signal: libc::c_int) {
use libc::{getpid, kill};

unsafe {
assert_eq!(kill(getpid(), signal), 0);
}
}

fn many_signals(bench: &mut Bencher) {
let num_signals = 10;
let (tx, mut rx) = mpsc::channel(num_signals);

let rt = runtime::Builder::new()
// Intentionally single threaded to measure delays in propagating wakes
.basic_scheduler()
.enable_all()
.build()
.unwrap();

let spawn_signal = |kind| {
let mut tx = tx.clone();
rt.spawn(async move {
let mut signal = signal(kind).expect("failed to create signal");

while signal.recv().await.is_some() {
if tx.send(()).await.is_err() {
break;
}
}
});
};

for _ in 0..num_signals {
// Pick some random signals which don't terminate the test harness
spawn_signal(SignalKind::child());
spawn_signal(SignalKind::io());
}
drop(tx);

// Turn the runtime for a while to ensure that all the spawned
// tasks have been polled at least once
rt.block_on(Spinner::new());

bench.iter(|| {
rt.block_on(async {
send_signal(libc::SIGCHLD);
for _ in 0..num_signals {
rx.recv().await.expect("channel closed");
}

send_signal(libc::SIGIO);
for _ in 0..num_signals {
rx.recv().await.expect("channel closed");
}
});
});
}

benchmark_group!(signal_group, many_signals,);

benchmark_main!(signal_group);
13 changes: 12 additions & 1 deletion tokio/src/io/registration.rs
Expand Up @@ -109,7 +109,18 @@ impl Registration {
where
T: Evented,
{
let handle = Handle::current();
Self::new_with_ready_and_handle(io, ready, Handle::current())
}

/// Same as `new_with_ready` but also accepts an explicit handle.
pub(crate) fn new_with_ready_and_handle<T>(
io: &T,
ready: mio::Ready,
handle: Handle,
) -> io::Result<Registration>
where
T: Evented,
{
let shared = if let Some(inner) = handle.inner() {
inner.add_source(io, ready)?
} else {
Expand Down
81 changes: 57 additions & 24 deletions tokio/src/runtime/builder.rs
@@ -1,7 +1,7 @@
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::handle::Handle;
use crate::runtime::shell::Shell;
use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner};

use std::fmt;
#[cfg(feature = "blocking")]
Expand Down Expand Up @@ -45,6 +45,9 @@ pub struct Builder {
/// Whether or not to enable the I/O driver
enable_io: bool,

/// Whether or not to enable the signal driver
enable_signal: bool,

/// Whether or not to enable the time driver
enable_time: bool,

Expand Down Expand Up @@ -98,6 +101,9 @@ impl Builder {
// I/O defaults to "off"
enable_io: false,

// Signal defaults to "off"
enable_signal: false,

// Time defaults to "off"
enable_time: false,

Expand Down Expand Up @@ -141,6 +147,9 @@ impl Builder {
pub fn enable_all(&mut self) -> &mut Self {
#[cfg(feature = "io-driver")]
self.enable_io();
#[cfg(feature = "signal")]
#[cfg(not(loom))]
self.enable_signal();
#[cfg(feature = "time")]
self.enable_time();

Expand Down Expand Up @@ -359,14 +368,18 @@ impl Builder {
}
}

fn get_cfg(&self) -> driver::Cfg {
driver::Cfg {
enable_io: self.enable_io,
enable_signal: self.enable_signal,
enable_time: self.enable_time,
}
}

fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::Kind;

let clock = time::create_clock();

// Create I/O driver
let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let (driver, resources) = driver::Driver::new(self.get_cfg())?;

let spawner = Spawner::Shell;

Expand All @@ -377,9 +390,10 @@ impl Builder {
kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))),
handle: Handle {
spawner,
io_handle,
time_handle,
clock,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
},
blocking_pool,
Expand Down Expand Up @@ -435,6 +449,30 @@ cfg_io_driver! {
}
}

cfg_signal! {
impl Builder {
/// Enables the signal driver.
///
/// Doing this enables using `tokio::signal` on the runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime;
///
/// let rt = runtime::Builder::new()
/// .enable_io() // Required by the signal driver
/// .enable_signal()
/// .build()
/// .unwrap();
/// ```
pub fn enable_signal(&mut self) -> &mut Self {
self.enable_signal = true;
self
}
}
}

cfg_time! {
impl Builder {
/// Enables the time driver.
Expand Down Expand Up @@ -478,12 +516,7 @@ cfg_rt_core! {
fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{BasicScheduler, Kind};

let clock = time::create_clock();

// Create I/O driver
let (io_driver, io_handle) = io::create_driver(self.enable_io)?;

let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let (driver, resources) = driver::Driver::new(self.get_cfg())?;

// And now put a single-threaded scheduler on top of the timer. When
// there are no futures ready to do something, it'll let the timer or
Expand All @@ -500,9 +533,10 @@ cfg_rt_core! {
kind: Kind::Basic(Mutex::new(Some(scheduler))),
handle: Handle {
spawner,
io_handle,
time_handle,
clock,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
},
blocking_pool,
Expand Down Expand Up @@ -533,10 +567,8 @@ cfg_rt_threaded! {
let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus()));
assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit");

let clock = time::create_clock();
let (driver, resources) = driver::Driver::new(self.get_cfg())?;

let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

Expand All @@ -547,9 +579,10 @@ cfg_rt_threaded! {
// Create the runtime handle
let handle = Handle {
spawner,
io_handle,
time_handle,
clock,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
};

Expand Down
16 changes: 13 additions & 3 deletions tokio/src/runtime/context.rs
Expand Up @@ -14,24 +14,34 @@ cfg_blocking_impl! {
}

cfg_io_driver! {
pub(crate) fn io_handle() -> crate::runtime::io::Handle {
pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.io_handle.clone(),
None => Default::default(),
})
}
}

cfg_signal! {
#[cfg(unix)]
pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.signal_handle.clone(),
None => Default::default(),
})
}
}

cfg_time! {
pub(crate) fn time_handle() -> crate::runtime::time::Handle {
pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.time_handle.clone(),
None => Default::default(),
})
}

cfg_test_util! {
pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
pub(crate) fn clock() -> Option<crate::runtime::driver::Clock> {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => Some(ctx.clock.clone()),
None => None,
Expand Down