Skip to content

Commit

Permalink
process: use blocking threadpool for child stdio I/O (#4824)
Browse files Browse the repository at this point in the history
  • Loading branch information
ipetkov committed Jul 23, 2022
1 parent 8c482a5 commit 1578575
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 44 deletions.
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Expand Up @@ -69,11 +69,11 @@ process = [
"mio/net",
"signal-hook-registry",
"winapi/handleapi",
"winapi/minwindef",
"winapi/processthreadsapi",
"winapi/threadpoollegacyapiset",
"winapi/winbase",
"winapi/winnt",
"winapi/minwindef",
]
# Includes basic task execution capabilities
rt = ["once_cell"]
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/io/blocking.rs
Expand Up @@ -34,8 +34,9 @@ enum State<T> {
Busy(sys::Blocking<(io::Result<usize>, Buf, T)>),
}

cfg_io_std! {
cfg_io_blocking! {
impl<T> Blocking<T> {
#[cfg_attr(feature = "fs", allow(dead_code))]
pub(crate) fn new(inner: T) -> Blocking<T> {
Blocking {
inner: Some(inner),
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/poll_evented.rs
Expand Up @@ -136,7 +136,7 @@ impl<E: Source> PollEvented<E> {
}

feature! {
#![any(feature = "net", feature = "process")]
#![any(feature = "net", all(unix, feature = "process"))]

use crate::io::ReadBuf;
use std::task::{Context, Poll};
Expand Down
7 changes: 6 additions & 1 deletion tokio/src/lib.rs
Expand Up @@ -431,7 +431,12 @@ cfg_process! {
pub mod process;
}

#[cfg(any(feature = "net", feature = "fs", feature = "io-std"))]
#[cfg(any(
feature = "fs",
feature = "io-std",
feature = "net",
all(windows, feature = "process"),
))]
mod blocking;

cfg_rt! {
Expand Down
14 changes: 9 additions & 5 deletions tokio/src/macros/cfg.rs
Expand Up @@ -70,7 +70,11 @@ macro_rules! cfg_fs {

macro_rules! cfg_io_blocking {
($($item:item)*) => {
$( #[cfg(any(feature = "io-std", feature = "fs"))] $item )*
$( #[cfg(any(
feature = "io-std",
feature = "fs",
all(windows, feature = "process"),
))] $item )*
}
}

Expand All @@ -79,12 +83,12 @@ macro_rules! cfg_io_driver {
$(
#[cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
#[cfg_attr(docsrs, doc(cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))))]
$item
Expand All @@ -97,7 +101,7 @@ macro_rules! cfg_io_driver_impl {
$(
#[cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
$item
Expand All @@ -110,7 +114,7 @@ macro_rules! cfg_not_io_driver {
$(
#[cfg(not(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
)))]
$item
Expand Down
22 changes: 10 additions & 12 deletions tokio/src/process/mod.rs
Expand Up @@ -1285,41 +1285,39 @@ impl ChildStderr {

impl AsyncWrite for ChildStdin {
fn poll_write(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.inner.poll_write(cx, buf)
Pin::new(&mut self.inner).poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}
}

impl AsyncRead for ChildStdout {
fn poll_read(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}

impl AsyncRead for ChildStderr {
fn poll_read(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}

Expand Down
55 changes: 49 additions & 6 deletions tokio/src/process/unix/mod.rs
Expand Up @@ -29,7 +29,7 @@ use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
mod reap;
use reap::Reaper;

use crate::io::PollEvented;
use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf};
use crate::process::kill::Kill;
use crate::process::SpawnedChild;
use crate::signal::unix::driver::Handle as SignalHandle;
Expand Down Expand Up @@ -177,8 +177,8 @@ impl AsRawFd for Pipe {
}
}

pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> {
let mut fd = io.into_inner()?.fd;
pub(crate) fn convert_to_stdio(io: ChildStdio) -> io::Result<Stdio> {
let mut fd = io.inner.into_inner()?.fd;

// Ensure that the fd to be inherited is set to *blocking* mode, as this
// is the default that virtually all programs expect to have. Those
Expand Down Expand Up @@ -213,7 +213,50 @@ impl Source for Pipe {
}
}

pub(crate) type ChildStdio = PollEvented<Pipe>;
pub(crate) struct ChildStdio {
inner: PollEvented<Pipe>,
}

impl fmt::Debug for ChildStdio {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(fmt)
}
}

impl AsRawFd for ChildStdio {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}

impl AsyncWrite for ChildStdio {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.inner.poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

impl AsyncRead for ChildStdio {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
}
}

fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> {
unsafe {
Expand All @@ -238,13 +281,13 @@ fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()>
Ok(())
}

pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<Pipe>>
pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio>
where
T: IntoRawFd,
{
// Set the fd to nonblocking before we pass it to the event loop
let mut pipe = Pipe::from(io);
set_nonblocking(&mut pipe, true)?;

PollEvented::new(pipe)
PollEvented::new(pipe).map(|inner| ChildStdio { inner })
}
99 changes: 84 additions & 15 deletions tokio/src/process/windows.rs
Expand Up @@ -15,22 +15,22 @@
//! `RegisterWaitForSingleObject` and then wait on the other end of the oneshot
//! from then on out.

use crate::io::PollEvented;
use crate::io::{blocking::Blocking, AsyncRead, AsyncWrite, ReadBuf};
use crate::process::kill::Kill;
use crate::process::SpawnedChild;
use crate::sync::oneshot;

use mio::windows::NamedPipe;
use std::fmt;
use std::fs::File as StdFile;
use std::future::Future;
use std::io;
use std::os::windows::prelude::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
use std::os::windows::prelude::{AsRawHandle, IntoRawHandle, RawHandle};
use std::pin::Pin;
use std::process::Stdio;
use std::process::{Child as StdChild, Command as StdCommand, ExitStatus};
use std::ptr;
use std::task::Context;
use std::task::Poll;
use std::sync::Arc;
use std::task::{Context, Poll};
use winapi::shared::minwindef::{DWORD, FALSE};
use winapi::um::handleapi::{DuplicateHandle, INVALID_HANDLE_VALUE};
use winapi::um::processthreadsapi::GetCurrentProcess;
Expand Down Expand Up @@ -167,28 +167,97 @@ unsafe extern "system" fn callback(ptr: PVOID, _timer_fired: BOOLEAN) {
let _ = complete.take().unwrap().send(());
}

pub(crate) type ChildStdio = PollEvented<NamedPipe>;
#[derive(Debug)]
struct ArcFile(Arc<StdFile>);

pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<NamedPipe>>
impl io::Read for ArcFile {
fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
(&*self.0).read(bytes)
}
}

impl io::Write for ArcFile {
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
(&*self.0).write(bytes)
}

fn flush(&mut self) -> io::Result<()> {
(&*self.0).flush()
}
}

#[derive(Debug)]
pub(crate) struct ChildStdio {
// Used for accessing the raw handle, even if the io version is busy
raw: Arc<StdFile>,
// For doing I/O operations asynchronously
io: Blocking<ArcFile>,
}

impl AsRawHandle for ChildStdio {
fn as_raw_handle(&self) -> RawHandle {
self.raw.as_raw_handle()
}
}

impl AsyncRead for ChildStdio {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_read(cx, buf)
}
}

impl AsyncWrite for ChildStdio {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.io).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_shutdown(cx)
}
}

pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio>
where
T: IntoRawHandle,
{
let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) };
PollEvented::new(pipe)
use std::os::windows::prelude::FromRawHandle;

let raw = Arc::new(unsafe { StdFile::from_raw_handle(io.into_raw_handle()) });
let io = Blocking::new(ArcFile(raw.clone()));
Ok(ChildStdio { raw, io })
}

pub(crate) fn convert_to_stdio(child_stdio: ChildStdio) -> io::Result<Stdio> {
let ChildStdio { raw, io } = child_stdio;
drop(io); // Try to drop the Arc count here

Arc::try_unwrap(raw)
.or_else(|raw| duplicate_handle(&*raw))
.map(Stdio::from)
}

pub(crate) fn convert_to_stdio(io: PollEvented<NamedPipe>) -> io::Result<Stdio> {
let named_pipe = io.into_inner()?;
fn duplicate_handle<T: AsRawHandle>(io: &T) -> io::Result<StdFile> {
use std::os::windows::prelude::FromRawHandle;

// Mio does not implement `IntoRawHandle` for `NamedPipe`, so we'll manually
// duplicate the handle here...
unsafe {
let mut dup_handle = INVALID_HANDLE_VALUE;
let cur_proc = GetCurrentProcess();

let status = DuplicateHandle(
cur_proc,
named_pipe.as_raw_handle(),
io.as_raw_handle(),
cur_proc,
&mut dup_handle,
0 as DWORD,
Expand All @@ -200,6 +269,6 @@ pub(crate) fn convert_to_stdio(io: PollEvented<NamedPipe>) -> io::Result<Stdio>
return Err(io::Error::last_os_error());
}

Ok(Stdio::from_raw_handle(dup_handle))
Ok(StdFile::from_raw_handle(dup_handle))
}
}
6 changes: 5 additions & 1 deletion tokio/src/runtime/builder.rs
Expand Up @@ -272,7 +272,11 @@ impl Builder {
/// .unwrap();
/// ```
pub fn enable_all(&mut self) -> &mut Self {
#[cfg(any(feature = "net", feature = "process", all(unix, feature = "signal")))]
#[cfg(any(
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal")
))]
self.enable_io();
#[cfg(feature = "time")]
self.enable_time();
Expand Down

0 comments on commit 1578575

Please sign in to comment.