Skip to content

Commit

Permalink
signal: move driver to runtime thread (#2835)
Browse files Browse the repository at this point in the history
Refactors the signal infrastructure to move the driver to the runtime
thread. This follows the model put forth by the I/O driver and time
driver.
  • Loading branch information
ipetkov committed Sep 22, 2020
1 parent e09b90e commit 7ae5b7b
Show file tree
Hide file tree
Showing 14 changed files with 532 additions and 240 deletions.
8 changes: 8 additions & 0 deletions benches/Cargo.toml
Expand Up @@ -8,6 +8,9 @@ edition = "2018"
tokio = { version = "0.3.0", path = "../tokio", features = ["full"] }
bencher = "0.1.5"

[target.'cfg(unix)'.dependencies]
libc = "0.2.42"

[[bench]]
name = "spawn"
path = "spawn.rs"
Expand All @@ -33,3 +36,8 @@ harness = false
name = "sync_semaphore"
path = "sync_semaphore.rs"
harness = false

[[bench]]
name = "signal"
path = "signal.rs"
harness = false
96 changes: 96 additions & 0 deletions benches/signal.rs
@@ -0,0 +1,96 @@
//! Benchmark the delay in propagating OS signals to any listeners.
#![cfg(unix)]

use bencher::{benchmark_group, benchmark_main, Bencher};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::runtime;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;

struct Spinner {
count: usize,
}

impl Future for Spinner {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count > 3 {
Poll::Ready(())
} else {
self.count += 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

impl Spinner {
fn new() -> Self {
Self { count: 0 }
}
}

pub fn send_signal(signal: libc::c_int) {
use libc::{getpid, kill};

unsafe {
assert_eq!(kill(getpid(), signal), 0);
}
}

fn many_signals(bench: &mut Bencher) {
let num_signals = 10;
let (tx, mut rx) = mpsc::channel(num_signals);

let rt = runtime::Builder::new()
// Intentionally single threaded to measure delays in propagating wakes
.basic_scheduler()
.enable_all()
.build()
.unwrap();

let spawn_signal = |kind| {
let mut tx = tx.clone();
rt.spawn(async move {
let mut signal = signal(kind).expect("failed to create signal");

while signal.recv().await.is_some() {
if tx.send(()).await.is_err() {
break;
}
}
});
};

for _ in 0..num_signals {
// Pick some random signals which don't terminate the test harness
spawn_signal(SignalKind::child());
spawn_signal(SignalKind::io());
}
drop(tx);

// Turn the runtime for a while to ensure that all the spawned
// tasks have been polled at least once
rt.block_on(Spinner::new());

bench.iter(|| {
rt.block_on(async {
send_signal(libc::SIGCHLD);
for _ in 0..num_signals {
rx.recv().await.expect("channel closed");
}

send_signal(libc::SIGIO);
for _ in 0..num_signals {
rx.recv().await.expect("channel closed");
}
});
});
}

benchmark_group!(signal_group, many_signals,);

benchmark_main!(signal_group);
13 changes: 12 additions & 1 deletion tokio/src/io/registration.rs
Expand Up @@ -109,7 +109,18 @@ impl Registration {
where
T: Evented,
{
let handle = Handle::current();
Self::new_with_ready_and_handle(io, ready, Handle::current())
}

/// Same as `new_with_ready` but also accepts an explicit handle.
pub(crate) fn new_with_ready_and_handle<T>(
io: &T,
ready: mio::Ready,
handle: Handle,
) -> io::Result<Registration>
where
T: Evented,
{
let shared = if let Some(inner) = handle.inner() {
inner.add_source(io, ready)?
} else {
Expand Down
47 changes: 23 additions & 24 deletions tokio/src/runtime/builder.rs
@@ -1,7 +1,7 @@
use crate::loom::sync::Mutex;
use crate::runtime::handle::Handle;
use crate::runtime::shell::Shell;
use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner};

use std::fmt;
#[cfg(feature = "blocking")]
Expand Down Expand Up @@ -359,14 +359,17 @@ impl Builder {
}
}

fn get_cfg(&self) -> driver::Cfg {
driver::Cfg {
enable_io: self.enable_io,
enable_time: self.enable_time,
}
}

fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::Kind;

let clock = time::create_clock();

// Create I/O driver
let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let (driver, resources) = driver::Driver::new(self.get_cfg())?;

let spawner = Spawner::Shell;

Expand All @@ -377,9 +380,10 @@ impl Builder {
kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))),
handle: Handle {
spawner,
io_handle,
time_handle,
clock,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
},
blocking_pool,
Expand Down Expand Up @@ -478,12 +482,7 @@ cfg_rt_core! {
fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{BasicScheduler, Kind};

let clock = time::create_clock();

// Create I/O driver
let (io_driver, io_handle) = io::create_driver(self.enable_io)?;

let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let (driver, resources) = driver::Driver::new(self.get_cfg())?;

// And now put a single-threaded scheduler on top of the timer. When
// there are no futures ready to do something, it'll let the timer or
Expand All @@ -500,9 +499,10 @@ cfg_rt_core! {
kind: Kind::Basic(Mutex::new(Some(scheduler))),
handle: Handle {
spawner,
io_handle,
time_handle,
clock,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
},
blocking_pool,
Expand Down Expand Up @@ -533,10 +533,8 @@ cfg_rt_threaded! {
let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus()));
assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit");

let clock = time::create_clock();
let (driver, resources) = driver::Driver::new(self.get_cfg())?;

let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

Expand All @@ -547,9 +545,10 @@ cfg_rt_threaded! {
// Create the runtime handle
let handle = Handle {
spawner,
io_handle,
time_handle,
clock,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
};

Expand Down
16 changes: 13 additions & 3 deletions tokio/src/runtime/context.rs
Expand Up @@ -14,24 +14,34 @@ cfg_blocking_impl! {
}

cfg_io_driver! {
pub(crate) fn io_handle() -> crate::runtime::io::Handle {
pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.io_handle.clone(),
None => Default::default(),
})
}
}

cfg_signal! {
#[cfg(unix)]
pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.signal_handle.clone(),
None => Default::default(),
})
}
}

cfg_time! {
pub(crate) fn time_handle() -> crate::runtime::time::Handle {
pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.time_handle.clone(),
None => Default::default(),
})
}

cfg_test_util! {
pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
pub(crate) fn clock() -> Option<crate::runtime::driver::Clock> {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => Some(ctx.clock.clone()),
None => None,
Expand Down

0 comments on commit 7ae5b7b

Please sign in to comment.