Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

process: add ProcessDriver to handle orphan reaping #2907

Merged
merged 1 commit into from Oct 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions tokio/src/macros/cfg.rs
Expand Up @@ -242,6 +242,23 @@ macro_rules! cfg_process {
}
}

macro_rules! cfg_process_driver {
($($item:item)*) => {
#[cfg(unix)]
#[cfg(not(loom))]
cfg_process! { $($item)* }
}
}

macro_rules! cfg_not_process_driver {
($($item:item)*) => {
$(
#[cfg(not(all(unix, not(loom), feature = "process")))]
$item
)*
}
}

macro_rules! cfg_signal {
($($item:item)*) => {
$(
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/process/mod.rs
Expand Up @@ -113,6 +113,11 @@
#[cfg(unix)]
mod imp;

#[cfg(unix)]
pub(crate) mod unix {
pub(crate) use super::imp::*;
}

#[path = "windows.rs"]
#[cfg(windows)]
mod imp;
Expand Down
154 changes: 154 additions & 0 deletions tokio/src/process/unix/driver.rs
@@ -0,0 +1,154 @@
//! Process driver

use crate::park::Park;
use crate::process::unix::orphan::ReapOrphanQueue;
use crate::process::unix::GlobalOrphanQueue;
use crate::signal::unix::driver::Driver as SignalDriver;
use crate::signal::unix::{signal_with_handle, InternalStream, Signal, SignalKind};
use crate::sync::mpsc::error::TryRecvError;

use std::io;
use std::time::Duration;

/// Responsible for cleaning up orphaned child processes on Unix platforms.
#[derive(Debug)]
pub(crate) struct Driver {
park: SignalDriver,
inner: CoreDriver<Signal, GlobalOrphanQueue>,
}

#[derive(Debug)]
struct CoreDriver<S, Q> {
sigchild: S,
orphan_queue: Q,
}

// ===== impl CoreDriver =====

impl<S, Q> CoreDriver<S, Q>
where
S: InternalStream,
Q: ReapOrphanQueue,
{
fn got_signal(&mut self) -> bool {
match self.sigchild.try_recv() {
Ok(()) => true,
Err(TryRecvError::Empty) => false,
Err(TryRecvError::Closed) => panic!("signal was deregistered"),
}
}

fn process(&mut self) {
if self.got_signal() {
// Drain all notifications which may have been buffered
// so we can try to reap all orphans in one batch
while self.got_signal() {}

self.orphan_queue.reap_orphans();
}
}
}

// ===== impl Driver =====

impl Driver {
/// Creates a new signal `Driver` instance that delegates wakeups to `park`.
pub(crate) fn new(park: SignalDriver) -> io::Result<Self> {
let sigchild = signal_with_handle(SignalKind::child(), park.handle())?;
let inner = CoreDriver {
sigchild,
orphan_queue: GlobalOrphanQueue,
};

Ok(Self { park, inner })
}
}

// ===== impl Park for Driver =====

impl Park for Driver {
type Unpark = <SignalDriver as Park>::Unpark;
type Error = io::Error;

fn unpark(&self) -> Self::Unpark {
self.park.unpark()
}

fn park(&mut self) -> Result<(), Self::Error> {
self.park.park()?;
self.inner.process();
Ok(())
}

fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.park.park_timeout(duration)?;
self.inner.process();
Ok(())
}

fn shutdown(&mut self) {
self.park.shutdown()
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::process::unix::orphan::test::MockQueue;
use crate::sync::mpsc::error::TryRecvError;
use std::task::{Context, Poll};

struct MockStream {
total_try_recv: usize,
values: Vec<Option<()>>,
}

impl MockStream {
fn new(values: Vec<Option<()>>) -> Self {
Self {
total_try_recv: 0,
values,
}
}
}

impl InternalStream for MockStream {
fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
unimplemented!();
}

fn try_recv(&mut self) -> Result<(), TryRecvError> {
self.total_try_recv += 1;
match self.values.remove(0) {
Some(()) => Ok(()),
None => Err(TryRecvError::Empty),
}
}
}

#[test]
fn no_reap_if_no_signal() {
let mut driver = CoreDriver {
sigchild: MockStream::new(vec![None]),
orphan_queue: MockQueue::<()>::new(),
};

driver.process();

assert_eq!(1, driver.sigchild.total_try_recv);
assert_eq!(0, driver.orphan_queue.total_reaps.get());
}

#[test]
fn coalesce_signals_before_reaping() {
let mut driver = CoreDriver {
sigchild: MockStream::new(vec![Some(()), Some(()), None]),
orphan_queue: MockQueue::<()>::new(),
};

driver.process();

assert_eq!(3, driver.sigchild.total_try_recv);
assert_eq!(1, driver.orphan_queue.total_reaps.get());
}
}
30 changes: 17 additions & 13 deletions tokio/src/process/unix/mod.rs
Expand Up @@ -21,8 +21,10 @@
//! processes in general aren't scalable (e.g. millions) so it shouldn't be that
//! bad in theory...

mod orphan;
use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
pub(crate) mod driver;

pub(crate) mod orphan;
use orphan::{OrphanQueue, OrphanQueueImpl, ReapOrphanQueue, Wait};

mod reap;
use reap::Reaper;
Expand All @@ -39,11 +41,11 @@ use std::future::Future;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::process::ExitStatus;
use std::process::{Child as StdChild, ExitStatus};
use std::task::Context;
use std::task::Poll;

impl Wait for std::process::Child {
impl Wait for StdChild {
fn id(&self) -> u32 {
self.id()
}
Expand All @@ -53,37 +55,39 @@ impl Wait for std::process::Child {
}
}

impl Kill for std::process::Child {
impl Kill for StdChild {
fn kill(&mut self) -> io::Result<()> {
self.kill()
}
}

lazy_static::lazy_static! {
static ref ORPHAN_QUEUE: OrphanQueueImpl<std::process::Child> = OrphanQueueImpl::new();
static ref ORPHAN_QUEUE: OrphanQueueImpl<StdChild> = OrphanQueueImpl::new();
}

struct GlobalOrphanQueue;
pub(crate) struct GlobalOrphanQueue;

impl fmt::Debug for GlobalOrphanQueue {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
ORPHAN_QUEUE.fmt(fmt)
}
}

impl OrphanQueue<std::process::Child> for GlobalOrphanQueue {
fn push_orphan(&self, orphan: std::process::Child) {
ORPHAN_QUEUE.push_orphan(orphan)
}

impl ReapOrphanQueue for GlobalOrphanQueue {
fn reap_orphans(&self) {
ORPHAN_QUEUE.reap_orphans()
}
}

impl OrphanQueue<StdChild> for GlobalOrphanQueue {
fn push_orphan(&self, orphan: StdChild) {
ORPHAN_QUEUE.push_orphan(orphan)
}
}

#[must_use = "futures do nothing unless polled"]
pub(crate) struct Child {
inner: Reaper<std::process::Child, GlobalOrphanQueue, Signal>,
inner: Reaper<StdChild, GlobalOrphanQueue, Signal>,
}

impl fmt::Debug for Child {
Expand Down
78 changes: 52 additions & 26 deletions tokio/src/process/unix/orphan.rs
Expand Up @@ -20,23 +20,29 @@ impl<T: Wait> Wait for &mut T {
}
}

/// An interface for queueing up an orphaned process so that it can be reaped.
pub(crate) trait OrphanQueue<T> {
/// Adds an orphan to the queue.
fn push_orphan(&self, orphan: T);
/// An interface for reaping a set of orphaned processes.
pub(crate) trait ReapOrphanQueue {
/// Attempts to reap every process in the queue, ignoring any errors and
/// enqueueing any orphans which have not yet exited.
fn reap_orphans(&self);
}

impl<T: ReapOrphanQueue> ReapOrphanQueue for &T {
fn reap_orphans(&self) {
(**self).reap_orphans()
}
}

/// An interface for queueing up an orphaned process so that it can be reaped.
pub(crate) trait OrphanQueue<T>: ReapOrphanQueue {
/// Adds an orphan to the queue.
fn push_orphan(&self, orphan: T);
}

impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
fn push_orphan(&self, orphan: T) {
(**self).push_orphan(orphan);
}

fn reap_orphans(&self) {
(**self).reap_orphans()
}
}

/// An implementation of `OrphanQueue`.
Expand All @@ -62,42 +68,62 @@ impl<T: Wait> OrphanQueue<T> for OrphanQueueImpl<T> {
fn push_orphan(&self, orphan: T) {
self.queue.lock().unwrap().push(orphan)
}
}

impl<T: Wait> ReapOrphanQueue for OrphanQueueImpl<T> {
fn reap_orphans(&self) {
let mut queue = self.queue.lock().unwrap();
let queue = &mut *queue;

let mut i = 0;
while i < queue.len() {
for i in (0..queue.len()).rev() {
match queue[i].try_wait() {
Ok(Some(_)) => {}
Err(_) => {
// TODO: bubble up error some how. Is this an internal bug?
// Shoudl we panic? Is it OK for this to be silently
// dropped?
}
// Still not done yet
Ok(None) => {
i += 1;
continue;
Ok(None) => {}
Ok(Some(_)) | Err(_) => {
// The stdlib handles interruption errors (EINTR) when polling a child process.
// All other errors represent invalid inputs or pids that have already been
// reaped, so we can drop the orphan in case an error is raised.
queue.swap_remove(i);
}
}

queue.remove(i);
}
}
}

#[cfg(all(test, not(loom)))]
mod test {
use super::Wait;
use super::{OrphanQueue, OrphanQueueImpl};
use std::cell::Cell;
pub(crate) mod test {
use super::*;
use std::cell::{Cell, RefCell};
use std::io;
use std::os::unix::process::ExitStatusExt;
use std::process::ExitStatus;
use std::rc::Rc;

pub(crate) struct MockQueue<W> {
pub(crate) all_enqueued: RefCell<Vec<W>>,
pub(crate) total_reaps: Cell<usize>,
}

impl<W> MockQueue<W> {
pub(crate) fn new() -> Self {
Self {
all_enqueued: RefCell::new(Vec::new()),
total_reaps: Cell::new(0),
}
}
}

impl<W> OrphanQueue<W> for MockQueue<W> {
fn push_orphan(&self, orphan: W) {
self.all_enqueued.borrow_mut().push(orphan);
}
}

impl<W> ReapOrphanQueue for MockQueue<W> {
fn reap_orphans(&self) {
self.total_reaps.set(self.total_reaps.get() + 1);
}
}

struct MockWait {
total_waits: Rc<Cell<usize>>,
num_wait_until_status: usize,
Expand Down