Skip to content

Commit

Permalink
process: add ProcessDriver to handle orphan reaping (#2907)
Browse files Browse the repository at this point in the history
  • Loading branch information
ipetkov committed Oct 6, 2020
1 parent 9730317 commit 4cf45c0
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 98 deletions.
17 changes: 17 additions & 0 deletions tokio/src/macros/cfg.rs
Expand Up @@ -242,6 +242,23 @@ macro_rules! cfg_process {
}
}

macro_rules! cfg_process_driver {
($($item:item)*) => {
#[cfg(unix)]
#[cfg(not(loom))]
cfg_process! { $($item)* }
}
}

macro_rules! cfg_not_process_driver {
($($item:item)*) => {
$(
#[cfg(not(all(unix, not(loom), feature = "process")))]
$item
)*
}
}

macro_rules! cfg_signal {
($($item:item)*) => {
$(
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/process/mod.rs
Expand Up @@ -113,6 +113,11 @@
#[cfg(unix)]
mod imp;

#[cfg(unix)]
pub(crate) mod unix {
pub(crate) use super::imp::*;
}

#[path = "windows.rs"]
#[cfg(windows)]
mod imp;
Expand Down
154 changes: 154 additions & 0 deletions tokio/src/process/unix/driver.rs
@@ -0,0 +1,154 @@
//! Process driver

use crate::park::Park;
use crate::process::unix::orphan::ReapOrphanQueue;
use crate::process::unix::GlobalOrphanQueue;
use crate::signal::unix::driver::Driver as SignalDriver;
use crate::signal::unix::{signal_with_handle, InternalStream, Signal, SignalKind};
use crate::sync::mpsc::error::TryRecvError;

use std::io;
use std::time::Duration;

/// Responsible for cleaning up orphaned child processes on Unix platforms.
#[derive(Debug)]
pub(crate) struct Driver {
park: SignalDriver,
inner: CoreDriver<Signal, GlobalOrphanQueue>,
}

#[derive(Debug)]
struct CoreDriver<S, Q> {
sigchild: S,
orphan_queue: Q,
}

// ===== impl CoreDriver =====

impl<S, Q> CoreDriver<S, Q>
where
S: InternalStream,
Q: ReapOrphanQueue,
{
fn got_signal(&mut self) -> bool {
match self.sigchild.try_recv() {
Ok(()) => true,
Err(TryRecvError::Empty) => false,
Err(TryRecvError::Closed) => panic!("signal was deregistered"),
}
}

fn process(&mut self) {
if self.got_signal() {
// Drain all notifications which may have been buffered
// so we can try to reap all orphans in one batch
while self.got_signal() {}

self.orphan_queue.reap_orphans();
}
}
}

// ===== impl Driver =====

impl Driver {
/// Creates a new signal `Driver` instance that delegates wakeups to `park`.
pub(crate) fn new(park: SignalDriver) -> io::Result<Self> {
let sigchild = signal_with_handle(SignalKind::child(), park.handle())?;
let inner = CoreDriver {
sigchild,
orphan_queue: GlobalOrphanQueue,
};

Ok(Self { park, inner })
}
}

// ===== impl Park for Driver =====

impl Park for Driver {
type Unpark = <SignalDriver as Park>::Unpark;
type Error = io::Error;

fn unpark(&self) -> Self::Unpark {
self.park.unpark()
}

fn park(&mut self) -> Result<(), Self::Error> {
self.park.park()?;
self.inner.process();
Ok(())
}

fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.park.park_timeout(duration)?;
self.inner.process();
Ok(())
}

fn shutdown(&mut self) {
self.park.shutdown()
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::process::unix::orphan::test::MockQueue;
use crate::sync::mpsc::error::TryRecvError;
use std::task::{Context, Poll};

struct MockStream {
total_try_recv: usize,
values: Vec<Option<()>>,
}

impl MockStream {
fn new(values: Vec<Option<()>>) -> Self {
Self {
total_try_recv: 0,
values,
}
}
}

impl InternalStream for MockStream {
fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
unimplemented!();
}

fn try_recv(&mut self) -> Result<(), TryRecvError> {
self.total_try_recv += 1;
match self.values.remove(0) {
Some(()) => Ok(()),
None => Err(TryRecvError::Empty),
}
}
}

#[test]
fn no_reap_if_no_signal() {
let mut driver = CoreDriver {
sigchild: MockStream::new(vec![None]),
orphan_queue: MockQueue::<()>::new(),
};

driver.process();

assert_eq!(1, driver.sigchild.total_try_recv);
assert_eq!(0, driver.orphan_queue.total_reaps.get());
}

#[test]
fn coalesce_signals_before_reaping() {
let mut driver = CoreDriver {
sigchild: MockStream::new(vec![Some(()), Some(()), None]),
orphan_queue: MockQueue::<()>::new(),
};

driver.process();

assert_eq!(3, driver.sigchild.total_try_recv);
assert_eq!(1, driver.orphan_queue.total_reaps.get());
}
}
30 changes: 17 additions & 13 deletions tokio/src/process/unix/mod.rs
Expand Up @@ -21,8 +21,10 @@
//! processes in general aren't scalable (e.g. millions) so it shouldn't be that
//! bad in theory...

mod orphan;
use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
pub(crate) mod driver;

pub(crate) mod orphan;
use orphan::{OrphanQueue, OrphanQueueImpl, ReapOrphanQueue, Wait};

mod reap;
use reap::Reaper;
Expand All @@ -39,11 +41,11 @@ use std::future::Future;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::process::ExitStatus;
use std::process::{Child as StdChild, ExitStatus};
use std::task::Context;
use std::task::Poll;

impl Wait for std::process::Child {
impl Wait for StdChild {
fn id(&self) -> u32 {
self.id()
}
Expand All @@ -53,37 +55,39 @@ impl Wait for std::process::Child {
}
}

impl Kill for std::process::Child {
impl Kill for StdChild {
fn kill(&mut self) -> io::Result<()> {
self.kill()
}
}

lazy_static::lazy_static! {
static ref ORPHAN_QUEUE: OrphanQueueImpl<std::process::Child> = OrphanQueueImpl::new();
static ref ORPHAN_QUEUE: OrphanQueueImpl<StdChild> = OrphanQueueImpl::new();
}

struct GlobalOrphanQueue;
pub(crate) struct GlobalOrphanQueue;

impl fmt::Debug for GlobalOrphanQueue {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
ORPHAN_QUEUE.fmt(fmt)
}
}

impl OrphanQueue<std::process::Child> for GlobalOrphanQueue {
fn push_orphan(&self, orphan: std::process::Child) {
ORPHAN_QUEUE.push_orphan(orphan)
}

impl ReapOrphanQueue for GlobalOrphanQueue {
fn reap_orphans(&self) {
ORPHAN_QUEUE.reap_orphans()
}
}

impl OrphanQueue<StdChild> for GlobalOrphanQueue {
fn push_orphan(&self, orphan: StdChild) {
ORPHAN_QUEUE.push_orphan(orphan)
}
}

#[must_use = "futures do nothing unless polled"]
pub(crate) struct Child {
inner: Reaper<std::process::Child, GlobalOrphanQueue, Signal>,
inner: Reaper<StdChild, GlobalOrphanQueue, Signal>,
}

impl fmt::Debug for Child {
Expand Down
78 changes: 52 additions & 26 deletions tokio/src/process/unix/orphan.rs
Expand Up @@ -20,23 +20,29 @@ impl<T: Wait> Wait for &mut T {
}
}

/// An interface for queueing up an orphaned process so that it can be reaped.
pub(crate) trait OrphanQueue<T> {
/// Adds an orphan to the queue.
fn push_orphan(&self, orphan: T);
/// An interface for reaping a set of orphaned processes.
pub(crate) trait ReapOrphanQueue {
/// Attempts to reap every process in the queue, ignoring any errors and
/// enqueueing any orphans which have not yet exited.
fn reap_orphans(&self);
}

impl<T: ReapOrphanQueue> ReapOrphanQueue for &T {
fn reap_orphans(&self) {
(**self).reap_orphans()
}
}

/// An interface for queueing up an orphaned process so that it can be reaped.
pub(crate) trait OrphanQueue<T>: ReapOrphanQueue {
/// Adds an orphan to the queue.
fn push_orphan(&self, orphan: T);
}

impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
fn push_orphan(&self, orphan: T) {
(**self).push_orphan(orphan);
}

fn reap_orphans(&self) {
(**self).reap_orphans()
}
}

/// An implementation of `OrphanQueue`.
Expand All @@ -62,42 +68,62 @@ impl<T: Wait> OrphanQueue<T> for OrphanQueueImpl<T> {
fn push_orphan(&self, orphan: T) {
self.queue.lock().unwrap().push(orphan)
}
}

impl<T: Wait> ReapOrphanQueue for OrphanQueueImpl<T> {
fn reap_orphans(&self) {
let mut queue = self.queue.lock().unwrap();
let queue = &mut *queue;

let mut i = 0;
while i < queue.len() {
for i in (0..queue.len()).rev() {
match queue[i].try_wait() {
Ok(Some(_)) => {}
Err(_) => {
// TODO: bubble up error some how. Is this an internal bug?
// Shoudl we panic? Is it OK for this to be silently
// dropped?
}
// Still not done yet
Ok(None) => {
i += 1;
continue;
Ok(None) => {}
Ok(Some(_)) | Err(_) => {
// The stdlib handles interruption errors (EINTR) when polling a child process.
// All other errors represent invalid inputs or pids that have already been
// reaped, so we can drop the orphan in case an error is raised.
queue.swap_remove(i);
}
}

queue.remove(i);
}
}
}

#[cfg(all(test, not(loom)))]
mod test {
use super::Wait;
use super::{OrphanQueue, OrphanQueueImpl};
use std::cell::Cell;
pub(crate) mod test {
use super::*;
use std::cell::{Cell, RefCell};
use std::io;
use std::os::unix::process::ExitStatusExt;
use std::process::ExitStatus;
use std::rc::Rc;

pub(crate) struct MockQueue<W> {
pub(crate) all_enqueued: RefCell<Vec<W>>,
pub(crate) total_reaps: Cell<usize>,
}

impl<W> MockQueue<W> {
pub(crate) fn new() -> Self {
Self {
all_enqueued: RefCell::new(Vec::new()),
total_reaps: Cell::new(0),
}
}
}

impl<W> OrphanQueue<W> for MockQueue<W> {
fn push_orphan(&self, orphan: W) {
self.all_enqueued.borrow_mut().push(orphan);
}
}

impl<W> ReapOrphanQueue for MockQueue<W> {
fn reap_orphans(&self) {
self.total_reaps.set(self.total_reaps.get() + 1);
}
}

struct MockWait {
total_waits: Rc<Cell<usize>>,
num_wait_until_status: usize,
Expand Down

0 comments on commit 4cf45c0

Please sign in to comment.