From 4cf45c038b9691f24fac22df13594c2223b185f6 Mon Sep 17 00:00:00 2001 From: Ivan Petkov Date: Tue, 6 Oct 2020 10:30:16 -0700 Subject: [PATCH] process: add ProcessDriver to handle orphan reaping (#2907) --- tokio/src/macros/cfg.rs | 17 ++++ tokio/src/process/mod.rs | 5 + tokio/src/process/unix/driver.rs | 154 +++++++++++++++++++++++++++++++ tokio/src/process/unix/mod.rs | 30 +++--- tokio/src/process/unix/orphan.rs | 78 ++++++++++------ tokio/src/process/unix/reap.rs | 57 +++--------- tokio/src/runtime/driver.rs | 45 +++++++-- tokio/src/signal/unix.rs | 37 +++++++- tokio/src/signal/unix/driver.rs | 2 +- 9 files changed, 327 insertions(+), 98 deletions(-) create mode 100644 tokio/src/process/unix/driver.rs diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index f245b09e921..328f3230452 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -242,6 +242,23 @@ macro_rules! cfg_process { } } +macro_rules! cfg_process_driver { + ($($item:item)*) => { + #[cfg(unix)] + #[cfg(not(loom))] + cfg_process! { $($item)* } + } +} + +macro_rules! cfg_not_process_driver { + ($($item:item)*) => { + $( + #[cfg(not(all(unix, not(loom), feature = "process")))] + $item + )* + } +} + macro_rules! cfg_signal { ($($item:item)*) => { $( diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index 7da9783be34..ff84428e002 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -113,6 +113,11 @@ #[cfg(unix)] mod imp; +#[cfg(unix)] +pub(crate) mod unix { + pub(crate) use super::imp::*; +} + #[path = "windows.rs"] #[cfg(windows)] mod imp; diff --git a/tokio/src/process/unix/driver.rs b/tokio/src/process/unix/driver.rs new file mode 100644 index 00000000000..2eea0043c93 --- /dev/null +++ b/tokio/src/process/unix/driver.rs @@ -0,0 +1,154 @@ +//! Process driver + +use crate::park::Park; +use crate::process::unix::orphan::ReapOrphanQueue; +use crate::process::unix::GlobalOrphanQueue; +use crate::signal::unix::driver::Driver as SignalDriver; +use crate::signal::unix::{signal_with_handle, InternalStream, Signal, SignalKind}; +use crate::sync::mpsc::error::TryRecvError; + +use std::io; +use std::time::Duration; + +/// Responsible for cleaning up orphaned child processes on Unix platforms. +#[derive(Debug)] +pub(crate) struct Driver { + park: SignalDriver, + inner: CoreDriver, +} + +#[derive(Debug)] +struct CoreDriver { + sigchild: S, + orphan_queue: Q, +} + +// ===== impl CoreDriver ===== + +impl CoreDriver +where + S: InternalStream, + Q: ReapOrphanQueue, +{ + fn got_signal(&mut self) -> bool { + match self.sigchild.try_recv() { + Ok(()) => true, + Err(TryRecvError::Empty) => false, + Err(TryRecvError::Closed) => panic!("signal was deregistered"), + } + } + + fn process(&mut self) { + if self.got_signal() { + // Drain all notifications which may have been buffered + // so we can try to reap all orphans in one batch + while self.got_signal() {} + + self.orphan_queue.reap_orphans(); + } + } +} + +// ===== impl Driver ===== + +impl Driver { + /// Creates a new signal `Driver` instance that delegates wakeups to `park`. + pub(crate) fn new(park: SignalDriver) -> io::Result { + let sigchild = signal_with_handle(SignalKind::child(), park.handle())?; + let inner = CoreDriver { + sigchild, + orphan_queue: GlobalOrphanQueue, + }; + + Ok(Self { park, inner }) + } +} + +// ===== impl Park for Driver ===== + +impl Park for Driver { + type Unpark = ::Unpark; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.park.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.park.park()?; + self.inner.process(); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.park.park_timeout(duration)?; + self.inner.process(); + Ok(()) + } + + fn shutdown(&mut self) { + self.park.shutdown() + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::process::unix::orphan::test::MockQueue; + use crate::sync::mpsc::error::TryRecvError; + use std::task::{Context, Poll}; + + struct MockStream { + total_try_recv: usize, + values: Vec>, + } + + impl MockStream { + fn new(values: Vec>) -> Self { + Self { + total_try_recv: 0, + values, + } + } + } + + impl InternalStream for MockStream { + fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll> { + unimplemented!(); + } + + fn try_recv(&mut self) -> Result<(), TryRecvError> { + self.total_try_recv += 1; + match self.values.remove(0) { + Some(()) => Ok(()), + None => Err(TryRecvError::Empty), + } + } + } + + #[test] + fn no_reap_if_no_signal() { + let mut driver = CoreDriver { + sigchild: MockStream::new(vec![None]), + orphan_queue: MockQueue::<()>::new(), + }; + + driver.process(); + + assert_eq!(1, driver.sigchild.total_try_recv); + assert_eq!(0, driver.orphan_queue.total_reaps.get()); + } + + #[test] + fn coalesce_signals_before_reaping() { + let mut driver = CoreDriver { + sigchild: MockStream::new(vec![Some(()), Some(()), None]), + orphan_queue: MockQueue::<()>::new(), + }; + + driver.process(); + + assert_eq!(3, driver.sigchild.total_try_recv); + assert_eq!(1, driver.orphan_queue.total_reaps.get()); + } +} diff --git a/tokio/src/process/unix/mod.rs b/tokio/src/process/unix/mod.rs index 46f1d790db1..db9d592c8e2 100644 --- a/tokio/src/process/unix/mod.rs +++ b/tokio/src/process/unix/mod.rs @@ -21,8 +21,10 @@ //! processes in general aren't scalable (e.g. millions) so it shouldn't be that //! bad in theory... -mod orphan; -use orphan::{OrphanQueue, OrphanQueueImpl, Wait}; +pub(crate) mod driver; + +pub(crate) mod orphan; +use orphan::{OrphanQueue, OrphanQueueImpl, ReapOrphanQueue, Wait}; mod reap; use reap::Reaper; @@ -39,11 +41,11 @@ use std::future::Future; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; use std::pin::Pin; -use std::process::ExitStatus; +use std::process::{Child as StdChild, ExitStatus}; use std::task::Context; use std::task::Poll; -impl Wait for std::process::Child { +impl Wait for StdChild { fn id(&self) -> u32 { self.id() } @@ -53,17 +55,17 @@ impl Wait for std::process::Child { } } -impl Kill for std::process::Child { +impl Kill for StdChild { fn kill(&mut self) -> io::Result<()> { self.kill() } } lazy_static::lazy_static! { - static ref ORPHAN_QUEUE: OrphanQueueImpl = OrphanQueueImpl::new(); + static ref ORPHAN_QUEUE: OrphanQueueImpl = OrphanQueueImpl::new(); } -struct GlobalOrphanQueue; +pub(crate) struct GlobalOrphanQueue; impl fmt::Debug for GlobalOrphanQueue { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -71,19 +73,21 @@ impl fmt::Debug for GlobalOrphanQueue { } } -impl OrphanQueue for GlobalOrphanQueue { - fn push_orphan(&self, orphan: std::process::Child) { - ORPHAN_QUEUE.push_orphan(orphan) - } - +impl ReapOrphanQueue for GlobalOrphanQueue { fn reap_orphans(&self) { ORPHAN_QUEUE.reap_orphans() } } +impl OrphanQueue for GlobalOrphanQueue { + fn push_orphan(&self, orphan: StdChild) { + ORPHAN_QUEUE.push_orphan(orphan) + } +} + #[must_use = "futures do nothing unless polled"] pub(crate) struct Child { - inner: Reaper, + inner: Reaper, } impl fmt::Debug for Child { diff --git a/tokio/src/process/unix/orphan.rs b/tokio/src/process/unix/orphan.rs index 6c449a90936..8a1e127831f 100644 --- a/tokio/src/process/unix/orphan.rs +++ b/tokio/src/process/unix/orphan.rs @@ -20,23 +20,29 @@ impl Wait for &mut T { } } -/// An interface for queueing up an orphaned process so that it can be reaped. -pub(crate) trait OrphanQueue { - /// Adds an orphan to the queue. - fn push_orphan(&self, orphan: T); +/// An interface for reaping a set of orphaned processes. +pub(crate) trait ReapOrphanQueue { /// Attempts to reap every process in the queue, ignoring any errors and /// enqueueing any orphans which have not yet exited. fn reap_orphans(&self); } +impl ReapOrphanQueue for &T { + fn reap_orphans(&self) { + (**self).reap_orphans() + } +} + +/// An interface for queueing up an orphaned process so that it can be reaped. +pub(crate) trait OrphanQueue: ReapOrphanQueue { + /// Adds an orphan to the queue. + fn push_orphan(&self, orphan: T); +} + impl> OrphanQueue for &O { fn push_orphan(&self, orphan: T) { (**self).push_orphan(orphan); } - - fn reap_orphans(&self) { - (**self).reap_orphans() - } } /// An implementation of `OrphanQueue`. @@ -62,42 +68,62 @@ impl OrphanQueue for OrphanQueueImpl { fn push_orphan(&self, orphan: T) { self.queue.lock().unwrap().push(orphan) } +} +impl ReapOrphanQueue for OrphanQueueImpl { fn reap_orphans(&self) { let mut queue = self.queue.lock().unwrap(); let queue = &mut *queue; - let mut i = 0; - while i < queue.len() { + for i in (0..queue.len()).rev() { match queue[i].try_wait() { - Ok(Some(_)) => {} - Err(_) => { - // TODO: bubble up error some how. Is this an internal bug? - // Shoudl we panic? Is it OK for this to be silently - // dropped? - } - // Still not done yet - Ok(None) => { - i += 1; - continue; + Ok(None) => {} + Ok(Some(_)) | Err(_) => { + // The stdlib handles interruption errors (EINTR) when polling a child process. + // All other errors represent invalid inputs or pids that have already been + // reaped, so we can drop the orphan in case an error is raised. + queue.swap_remove(i); } } - - queue.remove(i); } } } #[cfg(all(test, not(loom)))] -mod test { - use super::Wait; - use super::{OrphanQueue, OrphanQueueImpl}; - use std::cell::Cell; +pub(crate) mod test { + use super::*; + use std::cell::{Cell, RefCell}; use std::io; use std::os::unix::process::ExitStatusExt; use std::process::ExitStatus; use std::rc::Rc; + pub(crate) struct MockQueue { + pub(crate) all_enqueued: RefCell>, + pub(crate) total_reaps: Cell, + } + + impl MockQueue { + pub(crate) fn new() -> Self { + Self { + all_enqueued: RefCell::new(Vec::new()), + total_reaps: Cell::new(0), + } + } + } + + impl OrphanQueue for MockQueue { + fn push_orphan(&self, orphan: W) { + self.all_enqueued.borrow_mut().push(orphan); + } + } + + impl ReapOrphanQueue for MockQueue { + fn reap_orphans(&self) { + self.total_reaps.set(self.total_reaps.get() + 1); + } + } + struct MockWait { total_waits: Rc>, num_wait_until_status: usize, diff --git a/tokio/src/process/unix/reap.rs b/tokio/src/process/unix/reap.rs index c51a20b9c01..de483c44b41 100644 --- a/tokio/src/process/unix/reap.rs +++ b/tokio/src/process/unix/reap.rs @@ -1,6 +1,6 @@ use crate::process::imp::orphan::{OrphanQueue, Wait}; use crate::process::kill::Kill; -use crate::signal::unix::Signal; +use crate::signal::unix::InternalStream; use std::future::Future; use std::io; @@ -23,17 +23,6 @@ where signal: S, } -// Work around removal of `futures_core` dependency -pub(crate) trait Stream: Unpin { - fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll>; -} - -impl Stream for Signal { - fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - Signal::poll_recv(self, cx) - } -} - impl Deref for Reaper where W: Wait + Unpin, @@ -72,7 +61,7 @@ impl Future for Reaper where W: Wait + Unpin, Q: OrphanQueue + Unpin, - S: Stream, + S: InternalStream, { type Output = io::Result; @@ -80,10 +69,8 @@ where loop { // If the child hasn't exited yet, then it's our responsibility to // ensure the current task gets notified when it might be able to - // make progress. - // - // As described in `spawn` above, we just indicate that we can - // next make progress once a SIGCHLD is received. + // make progress. We can use the delivery of a SIGCHLD signal as a + // sign that we can potentially make progress. // // However, we will register for a notification on the next signal // BEFORE we poll the child. Otherwise it is possible that the child @@ -99,7 +86,6 @@ where // should not cause significant issues with parent futures. let registered_interest = self.signal.poll_recv(cx).is_pending(); - self.orphan_queue.reap_orphans(); if let Some(status) = self.inner_mut().try_wait()? { return Poll::Ready(Ok(status)); } @@ -147,8 +133,9 @@ where mod test { use super::*; + use crate::process::unix::orphan::test::MockQueue; + use crate::sync::mpsc::error::TryRecvError; use futures::future::FutureExt; - use std::cell::{Cell, RefCell}; use std::os::unix::process::ExitStatusExt; use std::process::ExitStatus; use std::task::Context; @@ -211,7 +198,7 @@ mod test { } } - impl Stream for MockStream { + impl InternalStream for MockStream { fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll> { self.total_polls += 1; match self.values.remove(0) { @@ -219,29 +206,9 @@ mod test { None => Poll::Pending, } } - } - struct MockQueue { - all_enqueued: RefCell>, - total_reaps: Cell, - } - - impl MockQueue { - fn new() -> Self { - Self { - all_enqueued: RefCell::new(Vec::new()), - total_reaps: Cell::new(0), - } - } - } - - impl OrphanQueue for MockQueue { - fn push_orphan(&self, orphan: W) { - self.all_enqueued.borrow_mut().push(orphan); - } - - fn reap_orphans(&self) { - self.total_reaps.set(self.total_reaps.get() + 1); + fn try_recv(&mut self) -> Result<(), TryRecvError> { + unimplemented!(); } } @@ -262,7 +229,7 @@ mod test { assert!(grim.poll_unpin(&mut context).is_pending()); assert_eq!(1, grim.signal.total_polls); assert_eq!(1, grim.total_waits); - assert_eq!(1, grim.orphan_queue.total_reaps.get()); + assert_eq!(0, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); // Not yet exited, couldn't register interest the first time @@ -270,7 +237,7 @@ mod test { assert!(grim.poll_unpin(&mut context).is_pending()); assert_eq!(3, grim.signal.total_polls); assert_eq!(3, grim.total_waits); - assert_eq!(3, grim.orphan_queue.total_reaps.get()); + assert_eq!(0, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); // Exited @@ -283,7 +250,7 @@ mod test { } assert_eq!(4, grim.signal.total_polls); assert_eq!(4, grim.total_waits); - assert_eq!(4, grim.orphan_queue.total_reaps.get()); + assert_eq!(0, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); } diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 148e0b1655f..af8e17a33dd 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -38,6 +38,7 @@ cfg_not_io_driver! { } // ===== signal driver ===== + macro_rules! cfg_signal_internal_and_unix { ($($item:item)*) => { #[cfg(unix)] @@ -73,10 +74,37 @@ cfg_not_signal_internal! { } } +// ===== process driver ===== + +cfg_process_driver! { + type ProcessDriver = crate::park::Either; + + fn create_process_driver(signal_driver: SignalDriver) -> io::Result { + use crate::park::Either; + + // Enable the signal driver if IO is also enabled + match signal_driver { + Either::A(signal_driver) => { + let driver = crate::process::unix::driver::Driver::new(signal_driver)?; + Ok(Either::A(driver)) + } + Either::B(_) => Ok(Either::B(signal_driver)), + } + } +} + +cfg_not_process_driver! { + type ProcessDriver = SignalDriver; + + fn create_process_driver(signal_driver: SignalDriver) -> io::Result { + Ok(signal_driver) + } +} + // ===== time driver ===== cfg_time! { - type TimeDriver = crate::park::Either, SignalDriver>; + type TimeDriver = crate::park::Either, ProcessDriver>; pub(crate) type Clock = crate::time::Clock; pub(crate) type TimeHandle = Option; @@ -87,24 +115,24 @@ cfg_time! { fn create_time_driver( enable: bool, - signal_driver: SignalDriver, + process_driver: ProcessDriver, clock: Clock, ) -> (TimeDriver, TimeHandle) { use crate::park::Either; if enable { - let driver = crate::time::driver::Driver::new(signal_driver, clock); + let driver = crate::time::driver::Driver::new(process_driver, clock); let handle = driver.handle(); (Either::A(driver), Some(handle)) } else { - (Either::B(signal_driver), None) + (Either::B(process_driver), None) } } } cfg_not_time! { - type TimeDriver = SignalDriver; + type TimeDriver = ProcessDriver; pub(crate) type Clock = (); pub(crate) type TimeHandle = (); @@ -115,10 +143,10 @@ cfg_not_time! { fn create_time_driver( _enable: bool, - signal_driver: SignalDriver, + process_driver: ProcessDriver, _clock: Clock, ) -> (TimeDriver, TimeHandle) { - (signal_driver, ()) + (process_driver, ()) } } @@ -147,8 +175,9 @@ impl Driver { let (io_driver, io_handle) = create_io_driver(cfg.enable_io)?; let (signal_driver, signal_handle) = create_signal_driver(io_driver)?; + let process_driver = create_process_driver(signal_driver)?; let (time_driver, time_handle) = - create_time_driver(cfg.enable_time, signal_driver, clock.clone()); + create_time_driver(cfg.enable_time, process_driver, clock.clone()); Ok(( Self { inner: time_driver }, diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index 77f300bb192..aaaa75edb74 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -6,6 +6,7 @@ #![cfg(unix)] use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; +use crate::sync::mpsc::error::TryRecvError; use crate::sync::mpsc::{channel, Receiver}; use libc::c_int; @@ -17,6 +18,7 @@ use std::sync::Once; use std::task::{Context, Poll}; pub(crate) mod driver; +use self::driver::Handle; pub(crate) type OsStorage = Vec; @@ -220,7 +222,7 @@ fn action(globals: Pin<&'static Globals>, signal: c_int) { /// /// This will register the signal handler if it hasn't already been registered, /// returning any error along the way if that fails. -fn signal_enable(signal: c_int) -> io::Result<()> { +fn signal_enable(signal: c_int, handle: Handle) -> io::Result<()> { if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) { return Err(Error::new( ErrorKind::Other, @@ -229,7 +231,7 @@ fn signal_enable(signal: c_int) -> io::Result<()> { } // Check that we have a signal driver running - driver::Handle::current().check_inner()?; + handle.check_inner()?; let globals = globals(); let siginfo = match globals.storage().get(signal as EventId) { @@ -349,10 +351,14 @@ pub struct Signal { /// * If the signal is one of /// [`signal_hook::FORBIDDEN`](fn@signal_hook_registry::register#panics) pub fn signal(kind: SignalKind) -> io::Result { + signal_with_handle(kind, Handle::current()) +} + +pub(crate) fn signal_with_handle(kind: SignalKind, handle: Handle) -> io::Result { let signal = kind.0; // Turn the signal delivery on once we are ready for it - signal_enable(signal)?; + signal_enable(signal, handle)?; // One wakeup in a queue is enough, no need for us to buffer up any // more. @@ -394,6 +400,11 @@ impl Signal { pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { self.rx.poll_recv(cx) } + + /// Try to receive a signal notification without blocking or registering a waker. + pub(crate) fn try_recv(&mut self) -> Result<(), TryRecvError> { + self.rx.try_recv() + } } cfg_stream! { @@ -406,6 +417,22 @@ cfg_stream! { } } +// Work around for abstracting streams internally +pub(crate) trait InternalStream: Unpin { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll>; + fn try_recv(&mut self) -> Result<(), TryRecvError>; +} + +impl InternalStream for Signal { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + self.poll_recv(cx) + } + + fn try_recv(&mut self) -> Result<(), TryRecvError> { + self.try_recv() + } +} + pub(crate) fn ctrl_c() -> io::Result { signal(SignalKind::interrupt()) } @@ -416,11 +443,11 @@ mod tests { #[test] fn signal_enable_error_on_invalid_input() { - signal_enable(-1).unwrap_err(); + signal_enable(-1, Handle::default()).unwrap_err(); } #[test] fn signal_enable_error_on_forbidden_input() { - signal_enable(signal_hook_registry::FORBIDDEN[0]).unwrap_err(); + signal_enable(signal_hook_registry::FORBIDDEN[0], Handle::default()).unwrap_err(); } } diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs index ae60c22fad2..d0615312f1e 100644 --- a/tokio/src/signal/unix/driver.rs +++ b/tokio/src/signal/unix/driver.rs @@ -30,7 +30,7 @@ pub(crate) struct Driver { inner: Arc, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub(crate) struct Handle { inner: Weak, }