From 7ae5b7bd4f93612f91ab504ffb63aa8241c1d7bb Mon Sep 17 00:00:00 2001 From: Ivan Petkov Date: Tue, 22 Sep 2020 15:40:44 -0700 Subject: [PATCH] signal: move driver to runtime thread (#2835) Refactors the signal infrastructure to move the driver to the runtime thread. This follows the model put forth by the I/O driver and time driver. --- benches/Cargo.toml | 8 ++ benches/signal.rs | 96 ++++++++++++++++ tokio/src/io/registration.rs | 13 ++- tokio/src/runtime/builder.rs | 47 ++++---- tokio/src/runtime/context.rs | 16 ++- tokio/src/runtime/driver.rs | 196 ++++++++++++++++++++++++++++++++ tokio/src/runtime/handle.rs | 11 +- tokio/src/runtime/io.rs | 63 ---------- tokio/src/runtime/mod.rs | 11 +- tokio/src/runtime/park.rs | 10 +- tokio/src/runtime/shell.rs | 8 +- tokio/src/runtime/time.rs | 59 ---------- tokio/src/signal/unix.rs | 81 ++----------- tokio/src/signal/unix/driver.rs | 153 +++++++++++++++++++++++++ 14 files changed, 532 insertions(+), 240 deletions(-) create mode 100644 benches/signal.rs create mode 100644 tokio/src/runtime/driver.rs delete mode 100644 tokio/src/runtime/io.rs delete mode 100644 tokio/src/runtime/time.rs create mode 100644 tokio/src/signal/unix/driver.rs diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 3a90943f059..cca0ece5eb8 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -8,6 +8,9 @@ edition = "2018" tokio = { version = "0.3.0", path = "../tokio", features = ["full"] } bencher = "0.1.5" +[target.'cfg(unix)'.dependencies] +libc = "0.2.42" + [[bench]] name = "spawn" path = "spawn.rs" @@ -33,3 +36,8 @@ harness = false name = "sync_semaphore" path = "sync_semaphore.rs" harness = false + +[[bench]] +name = "signal" +path = "signal.rs" +harness = false diff --git a/benches/signal.rs b/benches/signal.rs new file mode 100644 index 00000000000..d891326de0d --- /dev/null +++ b/benches/signal.rs @@ -0,0 +1,96 @@ +//! Benchmark the delay in propagating OS signals to any listeners. +#![cfg(unix)] + +use bencher::{benchmark_group, benchmark_main, Bencher}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::runtime; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::mpsc; + +struct Spinner { + count: usize, +} + +impl Future for Spinner { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.count > 3 { + Poll::Ready(()) + } else { + self.count += 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +impl Spinner { + fn new() -> Self { + Self { count: 0 } + } +} + +pub fn send_signal(signal: libc::c_int) { + use libc::{getpid, kill}; + + unsafe { + assert_eq!(kill(getpid(), signal), 0); + } +} + +fn many_signals(bench: &mut Bencher) { + let num_signals = 10; + let (tx, mut rx) = mpsc::channel(num_signals); + + let rt = runtime::Builder::new() + // Intentionally single threaded to measure delays in propagating wakes + .basic_scheduler() + .enable_all() + .build() + .unwrap(); + + let spawn_signal = |kind| { + let mut tx = tx.clone(); + rt.spawn(async move { + let mut signal = signal(kind).expect("failed to create signal"); + + while signal.recv().await.is_some() { + if tx.send(()).await.is_err() { + break; + } + } + }); + }; + + for _ in 0..num_signals { + // Pick some random signals which don't terminate the test harness + spawn_signal(SignalKind::child()); + spawn_signal(SignalKind::io()); + } + drop(tx); + + // Turn the runtime for a while to ensure that all the spawned + // tasks have been polled at least once + rt.block_on(Spinner::new()); + + bench.iter(|| { + rt.block_on(async { + send_signal(libc::SIGCHLD); + for _ in 0..num_signals { + rx.recv().await.expect("channel closed"); + } + + send_signal(libc::SIGIO); + for _ in 0..num_signals { + rx.recv().await.expect("channel closed"); + } + }); + }); +} + +benchmark_group!(signal_group, many_signals,); + +benchmark_main!(signal_group); diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 8206507280d..b805d2b98c6 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -109,7 +109,18 @@ impl Registration { where T: Evented, { - let handle = Handle::current(); + Self::new_with_ready_and_handle(io, ready, Handle::current()) + } + + /// Same as `new_with_ready` but also accepts an explicit handle. + pub(crate) fn new_with_ready_and_handle( + io: &T, + ready: mio::Ready, + handle: Handle, + ) -> io::Result + where + T: Evented, + { let shared = if let Some(inner) = handle.inner() { inner.add_source(io, ready)? } else { diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 42aed3e90b0..4072b04e465 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,7 +1,7 @@ use crate::loom::sync::Mutex; use crate::runtime::handle::Handle; use crate::runtime::shell::Shell; -use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner}; +use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner}; use std::fmt; #[cfg(feature = "blocking")] @@ -359,14 +359,17 @@ impl Builder { } } + fn get_cfg(&self) -> driver::Cfg { + driver::Cfg { + enable_io: self.enable_io, + enable_time: self.enable_time, + } + } + fn build_shell_runtime(&mut self) -> io::Result { use crate::runtime::Kind; - let clock = time::create_clock(); - - // Create I/O driver - let (io_driver, io_handle) = io::create_driver(self.enable_io)?; - let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); + let (driver, resources) = driver::Driver::new(self.get_cfg())?; let spawner = Spawner::Shell; @@ -377,9 +380,10 @@ impl Builder { kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))), handle: Handle { spawner, - io_handle, - time_handle, - clock, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, blocking_spawner, }, blocking_pool, @@ -478,12 +482,7 @@ cfg_rt_core! { fn build_basic_runtime(&mut self) -> io::Result { use crate::runtime::{BasicScheduler, Kind}; - let clock = time::create_clock(); - - // Create I/O driver - let (io_driver, io_handle) = io::create_driver(self.enable_io)?; - - let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); + let (driver, resources) = driver::Driver::new(self.get_cfg())?; // And now put a single-threaded scheduler on top of the timer. When // there are no futures ready to do something, it'll let the timer or @@ -500,9 +499,10 @@ cfg_rt_core! { kind: Kind::Basic(Mutex::new(Some(scheduler))), handle: Handle { spawner, - io_handle, - time_handle, - clock, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, blocking_spawner, }, blocking_pool, @@ -533,10 +533,8 @@ cfg_rt_threaded! { let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus())); assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit"); - let clock = time::create_clock(); + let (driver, resources) = driver::Driver::new(self.get_cfg())?; - let (io_driver, io_handle) = io::create_driver(self.enable_io)?; - let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver)); let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); @@ -547,9 +545,10 @@ cfg_rt_threaded! { // Create the runtime handle let handle = Handle { spawner, - io_handle, - time_handle, - clock, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, blocking_spawner, }; diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index c42b3432dea..7498175f3a7 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -14,7 +14,7 @@ cfg_blocking_impl! { } cfg_io_driver! { - pub(crate) fn io_handle() -> crate::runtime::io::Handle { + pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => ctx.io_handle.clone(), None => Default::default(), @@ -22,8 +22,18 @@ cfg_io_driver! { } } +cfg_signal! { + #[cfg(unix)] + pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle { + CONTEXT.with(|ctx| match *ctx.borrow() { + Some(ref ctx) => ctx.signal_handle.clone(), + None => Default::default(), + }) + } +} + cfg_time! { - pub(crate) fn time_handle() -> crate::runtime::time::Handle { + pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => ctx.time_handle.clone(), None => Default::default(), @@ -31,7 +41,7 @@ cfg_time! { } cfg_test_util! { - pub(crate) fn clock() -> Option { + pub(crate) fn clock() -> Option { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => Some(ctx.clock.clone()), None => None, diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs new file mode 100644 index 00000000000..2ea5089022d --- /dev/null +++ b/tokio/src/runtime/driver.rs @@ -0,0 +1,196 @@ +//! Abstracts out the entire chain of runtime sub-drivers into common types. +use crate::park::{Park, ParkThread}; +use std::io; +use std::time::Duration; + +// ===== io driver ===== + +cfg_io_driver! { + type IoDriver = crate::park::Either; + pub(crate) type IoHandle = Option; + + fn create_io_driver(enable: bool) -> io::Result<(IoDriver, IoHandle)> { + use crate::park::Either; + + #[cfg(loom)] + assert!(!enable); + + if enable { + let driver = crate::io::driver::Driver::new()?; + let handle = driver.handle(); + + Ok((Either::A(driver), Some(handle))) + } else { + let driver = ParkThread::new(); + Ok((Either::B(driver), None)) + } + } +} + +cfg_not_io_driver! { + type IoDriver = ParkThread; + pub(crate) type IoHandle = (); + + fn create_io_driver(_enable: bool) -> io::Result<(IoDriver, IoHandle)> { + let driver = ParkThread::new(); + Ok((driver, ())) + } +} + +// ===== signal driver ===== + +macro_rules! cfg_unix_and_signal { + ($($item:item)*) => { + $( + #[cfg(all(not(loom), unix, feature = "signal"))] + $item + )* + } +} + +macro_rules! cfg_neither_unix_nor_windows { + ($($item:item)*) => { + $( + #[cfg(any(loom, not(all(unix, feature = "signal"))))] + $item + )* + } +} + +cfg_unix_and_signal! { + type SignalDriver = crate::park::Either; + pub(crate) type SignalHandle = Option; + + fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> { + use crate::park::Either; + + // Enable the signal driver if IO is also enabled + match io_driver { + Either::A(io_driver) => { + let driver = crate::signal::unix::driver::Driver::new(io_driver)?; + let handle = driver.handle(); + Ok((Either::A(driver), Some(handle))) + } + Either::B(_) => Ok((Either::B(io_driver), None)), + } + } +} + +cfg_neither_unix_nor_windows! { + type SignalDriver = IoDriver; + pub(crate) type SignalHandle = (); + + fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> { + Ok((io_driver, ())) + } +} + +// ===== time driver ===== + +cfg_time! { + type TimeDriver = crate::park::Either, SignalDriver>; + + pub(crate) type Clock = crate::time::Clock; + pub(crate) type TimeHandle = Option; + + fn create_clock() -> Clock { + crate::time::Clock::new() + } + + fn create_time_driver( + enable: bool, + signal_driver: SignalDriver, + clock: Clock, + ) -> (TimeDriver, TimeHandle) { + use crate::park::Either; + + if enable { + let driver = crate::time::driver::Driver::new(signal_driver, clock); + let handle = driver.handle(); + + (Either::A(driver), Some(handle)) + } else { + (Either::B(signal_driver), None) + } + } +} + +cfg_not_time! { + type TimeDriver = SignalDriver; + + pub(crate) type Clock = (); + pub(crate) type TimeHandle = (); + + fn create_clock() -> Clock { + () + } + + fn create_time_driver( + _enable: bool, + signal_driver: SignalDriver, + _clock: Clock, + ) -> (TimeDriver, TimeHandle) { + (signal_driver, ()) + } +} + +// ===== runtime driver ===== + +#[derive(Debug)] +pub(crate) struct Driver { + inner: TimeDriver, +} + +pub(crate) struct Resources { + pub(crate) io_handle: IoHandle, + pub(crate) signal_handle: SignalHandle, + pub(crate) time_handle: TimeHandle, + pub(crate) clock: Clock, +} + +pub(crate) struct Cfg { + pub(crate) enable_io: bool, + pub(crate) enable_time: bool, +} + +impl Driver { + pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> { + let clock = create_clock(); + + let (io_driver, io_handle) = create_io_driver(cfg.enable_io)?; + let (signal_driver, signal_handle) = create_signal_driver(io_driver)?; + let (time_driver, time_handle) = + create_time_driver(cfg.enable_time, signal_driver, clock.clone()); + + Ok(( + Self { inner: time_driver }, + Resources { + io_handle, + signal_handle, + time_handle, + clock, + }, + )) + } +} + +impl Park for Driver { + type Unpark = ::Unpark; + type Error = ::Error; + + fn unpark(&self) -> Self::Unpark { + self.inner.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park() + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.inner.park_timeout(duration) + } + + fn shutdown(&mut self) { + self.inner.shutdown() + } +} diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 516ad4b3aad..dfcc5e97921 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,4 +1,4 @@ -use crate::runtime::{blocking, context, io, time, Spawner}; +use crate::runtime::{blocking, context, driver, Spawner}; /// Handle to the runtime. /// @@ -11,13 +11,16 @@ pub(crate) struct Handle { pub(super) spawner: Spawner, /// Handles to the I/O drivers - pub(super) io_handle: io::Handle, + pub(super) io_handle: driver::IoHandle, + + /// Handles to the signal drivers + pub(super) signal_handle: driver::SignalHandle, /// Handles to the time drivers - pub(super) time_handle: time::Handle, + pub(super) time_handle: driver::TimeHandle, /// Source of `Instant::now()` - pub(super) clock: time::Clock, + pub(super) clock: driver::Clock, /// Blocking pool spawner pub(super) blocking_spawner: blocking::Spawner, diff --git a/tokio/src/runtime/io.rs b/tokio/src/runtime/io.rs deleted file mode 100644 index 6a0953af851..00000000000 --- a/tokio/src/runtime/io.rs +++ /dev/null @@ -1,63 +0,0 @@ -//! Abstracts out the APIs necessary to `Runtime` for integrating the I/O -//! driver. When the `time` feature flag is **not** enabled. These APIs are -//! shells. This isolates the complexity of dealing with conditional -//! compilation. - -/// Re-exported for convenience. -pub(crate) use std::io::Result; - -pub(crate) use variant::*; - -#[cfg(feature = "io-driver")] -mod variant { - use crate::io::driver; - use crate::park::{Either, ParkThread}; - - use std::io; - - /// The driver value the runtime passes to the `timer` layer. - /// - /// When the `io-driver` feature is enabled, this is the "real" I/O driver - /// backed by Mio. Without the `io-driver` feature, this is a thread parker - /// backed by a condition variable. - pub(crate) type Driver = Either; - - /// The handle the runtime stores for future use. - /// - /// When the `io-driver` feature is **not** enabled, this is `()`. - pub(crate) type Handle = Option; - - pub(crate) fn create_driver(enable: bool) -> io::Result<(Driver, Handle)> { - #[cfg(loom)] - assert!(!enable); - - if enable { - let driver = driver::Driver::new()?; - let handle = driver.handle(); - - Ok((Either::A(driver), Some(handle))) - } else { - let driver = ParkThread::new(); - Ok((Either::B(driver), None)) - } - } -} - -#[cfg(not(feature = "io-driver"))] -mod variant { - use crate::park::ParkThread; - - use std::io; - - /// I/O is not enabled, use a condition variable based parker - pub(crate) type Driver = ParkThread; - - /// There is no handle - pub(crate) type Handle = (); - - pub(crate) fn create_driver(_enable: bool) -> io::Result<(Driver, Handle)> { - let driver = ParkThread::new(); - - Ok((driver, ())) - } -} diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index bec0ecd5949..884c2b46fa4 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -208,13 +208,18 @@ cfg_blocking_impl! { mod builder; pub use self::builder::Builder; +pub(crate) mod driver; + pub(crate) mod enter; use self::enter::enter; mod handle; use handle::Handle; -mod io; +mod io { + /// Re-exported for convenience. + pub(crate) use std::io::Result; +} cfg_rt_threaded! { mod park; @@ -227,8 +232,6 @@ use self::shell::Shell; mod spawner; use self::spawner::Spawner; -mod time; - cfg_rt_threaded! { mod queue; @@ -293,7 +296,7 @@ enum Kind { /// Execute all tasks on the current-thread. #[cfg(feature = "rt-core")] - Basic(Mutex>>), + Basic(Mutex>>), /// Execute tasks across multiple threads. #[cfg(feature = "rt-threaded")] diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/park.rs index 1dcf65af91b..c994c935ce2 100644 --- a/tokio/src/runtime/park.rs +++ b/tokio/src/runtime/park.rs @@ -6,7 +6,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::loom::thread; use crate::park::{Park, Unpark}; -use crate::runtime::time; +use crate::runtime::driver::Driver; use crate::util::TryLock; use std::sync::atomic::Ordering::SeqCst; @@ -42,14 +42,14 @@ const NOTIFIED: usize = 3; /// Shared across multiple Parker handles struct Shared { /// Shared driver. Only one thread at a time can use this - driver: TryLock, + driver: TryLock, /// Unpark handle - handle: ::Unpark, + handle: ::Unpark, } impl Parker { - pub(crate) fn new(driver: time::Driver) -> Parker { + pub(crate) fn new(driver: Driver) -> Parker { let handle = driver.unpark(); Parker { @@ -180,7 +180,7 @@ impl Inner { } } - fn park_driver(&self, driver: &mut time::Driver) { + fn park_driver(&self, driver: &mut Driver) { match self .state .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) diff --git a/tokio/src/runtime/shell.rs b/tokio/src/runtime/shell.rs index a65869d0de2..3d631239bf8 100644 --- a/tokio/src/runtime/shell.rs +++ b/tokio/src/runtime/shell.rs @@ -1,8 +1,8 @@ #![allow(clippy::redundant_clone)] use crate::park::{Park, Unpark}; +use crate::runtime::driver::Driver; use crate::runtime::enter; -use crate::runtime::time; use crate::util::{waker_ref, Wake}; use std::future::Future; @@ -12,17 +12,17 @@ use std::task::Poll::Ready; #[derive(Debug)] pub(super) struct Shell { - driver: time::Driver, + driver: Driver, /// TODO: don't store this unpark: Arc, } #[derive(Debug)] -struct Handle(::Unpark); +struct Handle(::Unpark); impl Shell { - pub(super) fn new(driver: time::Driver) -> Shell { + pub(super) fn new(driver: Driver) -> Shell { let unpark = Arc::new(Handle(driver.unpark())); Shell { driver, unpark } diff --git a/tokio/src/runtime/time.rs b/tokio/src/runtime/time.rs deleted file mode 100644 index c623d9641a1..00000000000 --- a/tokio/src/runtime/time.rs +++ /dev/null @@ -1,59 +0,0 @@ -//! Abstracts out the APIs necessary to `Runtime` for integrating the time -//! driver. When the `time` feature flag is **not** enabled. These APIs are -//! shells. This isolates the complexity of dealing with conditional -//! compilation. - -pub(crate) use variant::*; - -#[cfg(feature = "time")] -mod variant { - use crate::park::Either; - use crate::runtime::io; - use crate::time::{self, driver}; - - pub(crate) type Clock = time::Clock; - pub(crate) type Driver = Either, io::Driver>; - pub(crate) type Handle = Option; - - pub(crate) fn create_clock() -> Clock { - Clock::new() - } - - /// Create a new timer driver / handle pair - pub(crate) fn create_driver( - enable: bool, - io_driver: io::Driver, - clock: Clock, - ) -> (Driver, Handle) { - if enable { - let driver = driver::Driver::new(io_driver, clock); - let handle = driver.handle(); - - (Either::A(driver), Some(handle)) - } else { - (Either::B(io_driver), None) - } - } -} - -#[cfg(not(feature = "time"))] -mod variant { - use crate::runtime::io; - - pub(crate) type Clock = (); - pub(crate) type Driver = io::Driver; - pub(crate) type Handle = (); - - pub(crate) fn create_clock() -> Clock { - () - } - - /// Create a new timer driver / handle pair - pub(crate) fn create_driver( - _enable: bool, - io_driver: io::Driver, - _clock: Clock, - ) -> (Driver, Handle) { - (io_driver, ()) - } -} diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index bc48bdfaa64..45a091d76a6 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -5,7 +5,6 @@ #![cfg(unix)] -use crate::io::{AsyncRead, PollEvented, ReadBuf}; use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; use crate::sync::mpsc::{channel, Receiver}; @@ -17,6 +16,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Once; use std::task::{Context, Poll}; +pub(crate) mod driver; + pub(crate) type OsStorage = Vec; // Number of different unix signals @@ -202,9 +203,9 @@ impl Default for SignalInfo { /// The purpose of this signal handler is to primarily: /// /// 1. Flag that our specific signal was received (e.g. store an atomic flag) -/// 2. Wake up driver tasks by writing a byte to a pipe +/// 2. Wake up the driver by writing a byte to a pipe /// -/// Those two operations shoudl both be async-signal safe. +/// Those two operations should both be async-signal safe. fn action(globals: Pin<&'static Globals>, signal: c_int) { globals.record_event(signal as EventId); @@ -227,6 +228,9 @@ fn signal_enable(signal: c_int) -> io::Result<()> { )); } + // Check that we have a signal driver running + driver::Handle::current().check_inner()?; + let globals = globals(); let siginfo = match globals.storage().get(signal as EventId) { Some(slot) => slot, @@ -254,69 +258,6 @@ fn signal_enable(signal: c_int) -> io::Result<()> { } } -#[derive(Debug)] -struct Driver { - wakeup: PollEvented, -} - -impl Driver { - fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> { - // Drain the data from the pipe and maintain interest in getting more - self.drain(cx); - // Broadcast any signals which were received - globals().broadcast(); - - Poll::Pending - } -} - -impl Driver { - fn new() -> io::Result { - // NB: We give each driver a "fresh" reciever file descriptor to avoid - // the issues described in alexcrichton/tokio-process#42. - // - // In the past we would reuse the actual receiver file descriptor and - // swallow any errors around double registration of the same descriptor. - // I'm not sure if the second (failed) registration simply doesn't end up - // receiving wake up notifications, or there could be some race condition - // when consuming readiness events, but having distinct descriptors for - // distinct PollEvented instances appears to mitigate this. - // - // Unfortunately we cannot just use a single global PollEvented instance - // either, since we can't compare Handles or assume they will always - // point to the exact same reactor. - let stream = globals().receiver.try_clone()?; - let wakeup = PollEvented::new(stream)?; - - Ok(Driver { wakeup }) - } - - /// Drain all data in the global receiver, ensuring we'll get woken up when - /// there is a write on the other end. - /// - /// We do *NOT* use the existence of any read bytes as evidence a signal was - /// received since the `pending` flags would have already been set if that - /// was the case. See - /// [#38](https://github.com/alexcrichton/tokio-signal/issues/38) for more - /// info. - fn drain(&mut self, cx: &mut Context<'_>) { - let mut buf = [0; 128]; - let mut buf = ReadBuf::new(&mut buf); - loop { - match Pin::new(&mut self.wakeup).poll_read(cx, &mut buf) { - Poll::Ready(Ok(())) => { - if buf.filled().is_empty() { - panic!("EOF on self-pipe") - } - buf.clear(); - } - Poll::Ready(Err(e)) => panic!("Bad read on self-pipe: {}", e), - Poll::Pending => break, - } - } - } -} - /// A stream of events for receiving a particular type of OS signal. /// /// In general signal handling on Unix is a pretty tricky topic, and this @@ -382,7 +323,6 @@ impl Driver { #[must_use = "streams do nothing unless polled"] #[derive(Debug)] pub struct Signal { - driver: Driver, rx: Receiver<()>, } @@ -414,16 +354,12 @@ pub fn signal(kind: SignalKind) -> io::Result { // Turn the signal delivery on once we are ready for it signal_enable(signal)?; - // Ensure there's a driver for our associated event loop processing - // signals. - let driver = Driver::new()?; - // One wakeup in a queue is enough, no need for us to buffer up any // more. let (tx, rx) = channel(1); globals().register_listener(signal as EventId, tx); - Ok(Signal { driver, rx }) + Ok(Signal { rx }) } impl Signal { @@ -484,7 +420,6 @@ impl Signal { /// } /// ``` pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - let _ = self.driver.poll(cx); self.rx.poll_recv(cx) } } diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs new file mode 100644 index 00000000000..0458893cce4 --- /dev/null +++ b/tokio/src/signal/unix/driver.rs @@ -0,0 +1,153 @@ +//! Signal driver + +use crate::io::driver::Driver as IoDriver; +use crate::io::Registration; +use crate::park::Park; +use crate::runtime::context; +use crate::signal::registry::globals; +use mio_uds::UnixStream; +use std::io::{self, Read}; +use std::sync::{Arc, Weak}; +use std::time::Duration; + +/// Responsible for registering wakeups when an OS signal is received, and +/// subsequently dispatching notifications to any signal listeners as appropriate. +/// +/// Note: this driver relies on having an enabled IO driver in order to listen to +/// pipe write wakeups. +#[derive(Debug)] +pub(crate) struct Driver { + /// Thread parker. The `Driver` park implementation delegates to this. + park: IoDriver, + + /// A pipe for receiving wake events from the signal handler + receiver: UnixStream, + + /// The actual registraiton for `receiver` when active. + /// Lazily bound at the first signal registration. + registration: Registration, + + /// Shared state + inner: Arc, +} + +#[derive(Clone, Debug)] +pub(crate) struct Handle { + inner: Weak, +} + +#[derive(Debug)] +pub(super) struct Inner(()); + +// ===== impl Driver ===== + +impl Driver { + /// Creates a new signal `Driver` instance that delegates wakeups to `park`. + pub(crate) fn new(park: IoDriver) -> io::Result { + // NB: We give each driver a "fresh" reciever file descriptor to avoid + // the issues described in alexcrichton/tokio-process#42. + // + // In the past we would reuse the actual receiver file descriptor and + // swallow any errors around double registration of the same descriptor. + // I'm not sure if the second (failed) registration simply doesn't end up + // receiving wake up notifications, or there could be some race condition + // when consuming readiness events, but having distinct descriptors for + // distinct PollEvented instances appears to mitigate this. + // + // Unfortunately we cannot just use a single global PollEvented instance + // either, since we can't compare Handles or assume they will always + // point to the exact same reactor. + let receiver = globals().receiver.try_clone()?; + let registration = + Registration::new_with_ready_and_handle(&receiver, mio::Ready::all(), park.handle())?; + + Ok(Self { + park, + receiver, + registration, + inner: Arc::new(Inner(())), + }) + } + + /// Returns a handle to this event loop which can be sent across threads + /// and can be used as a proxy to the event loop itself. + pub(crate) fn handle(&self) -> Handle { + Handle { + inner: Arc::downgrade(&self.inner), + } + } + + fn process(&self) { + // Check if the pipe is ready to read and therefore has "woken" us up + match self.registration.take_read_ready() { + Ok(Some(ready)) => assert!(ready.is_readable()), + Ok(None) => return, // No wake has arrived, bail + Err(e) => panic!("reactor gone: {}", e), + } + + // Drain the pipe completely so we can receive a new readiness event + // if another signal has come in. + let mut buf = [0; 128]; + loop { + match (&self.receiver).read(&mut buf) { + Ok(0) => panic!("EOF on self-pipe"), + Ok(_) => continue, // Keep reading + Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(e) => panic!("Bad read on self-pipe: {}", e), + } + } + + // Broadcast any signals which were received + globals().broadcast(); + } +} + +// ===== impl Park for Driver ===== + +impl Park for Driver { + type Unpark = ::Unpark; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.park.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.park.park()?; + self.process(); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.park.park_timeout(duration)?; + self.process(); + Ok(()) + } + + fn shutdown(&mut self) { + self.park.shutdown() + } +} + +// ===== impl Handle ===== + +impl Handle { + /// Returns a handle to the current driver + /// + /// # Panics + /// + /// This function panics if there is no current signal driver set. + pub(super) fn current() -> Self { + context::signal_handle().expect( + "there is no signal driver running, must be called from the context of Tokio runtime", + ) + } + + pub(super) fn check_inner(&self) -> io::Result<()> { + if self.inner.strong_count() > 0 { + Ok(()) + } else { + Err(io::Error::new(io::ErrorKind::Other, "signal driver gone")) + } + } +}