Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

process: use blocking threadpool for child stdio I/O #4824

Merged
merged 4 commits into from Jul 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Expand Up @@ -69,11 +69,11 @@ process = [
"mio/net",
"signal-hook-registry",
"winapi/handleapi",
"winapi/minwindef",
"winapi/processthreadsapi",
"winapi/threadpoollegacyapiset",
"winapi/winbase",
"winapi/winnt",
"winapi/minwindef",
]
# Includes basic task execution capabilities
rt = ["once_cell"]
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/io/blocking.rs
Expand Up @@ -34,8 +34,9 @@ enum State<T> {
Busy(sys::Blocking<(io::Result<usize>, Buf, T)>),
}

cfg_io_std! {
cfg_io_blocking! {
impl<T> Blocking<T> {
#[cfg_attr(feature = "fs", allow(dead_code))]
pub(crate) fn new(inner: T) -> Blocking<T> {
Blocking {
inner: Some(inner),
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/poll_evented.rs
Expand Up @@ -136,7 +136,7 @@ impl<E: Source> PollEvented<E> {
}

feature! {
#![any(feature = "net", feature = "process")]
#![any(feature = "net", all(unix, feature = "process"))]
ipetkov marked this conversation as resolved.
Show resolved Hide resolved

use crate::io::ReadBuf;
use std::task::{Context, Poll};
Expand Down
7 changes: 6 additions & 1 deletion tokio/src/lib.rs
Expand Up @@ -431,7 +431,12 @@ cfg_process! {
pub mod process;
}

#[cfg(any(feature = "net", feature = "fs", feature = "io-std"))]
#[cfg(any(
feature = "fs",
feature = "io-std",
feature = "net",
all(windows, feature = "process"),
))]
mod blocking;

cfg_rt! {
Expand Down
14 changes: 9 additions & 5 deletions tokio/src/macros/cfg.rs
Expand Up @@ -70,7 +70,11 @@ macro_rules! cfg_fs {

macro_rules! cfg_io_blocking {
($($item:item)*) => {
$( #[cfg(any(feature = "io-std", feature = "fs"))] $item )*
$( #[cfg(any(
feature = "io-std",
feature = "fs",
all(windows, feature = "process"),
))] $item )*
}
}

Expand All @@ -79,12 +83,12 @@ macro_rules! cfg_io_driver {
$(
#[cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
#[cfg_attr(docsrs, doc(cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))))]
$item
Expand All @@ -97,7 +101,7 @@ macro_rules! cfg_io_driver_impl {
$(
#[cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
$item
Expand All @@ -110,7 +114,7 @@ macro_rules! cfg_not_io_driver {
$(
#[cfg(not(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
)))]
$item
Expand Down
22 changes: 10 additions & 12 deletions tokio/src/process/mod.rs
Expand Up @@ -1285,41 +1285,39 @@ impl ChildStderr {

impl AsyncWrite for ChildStdin {
fn poll_write(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.inner.poll_write(cx, buf)
Pin::new(&mut self.inner).poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}
}

impl AsyncRead for ChildStdout {
fn poll_read(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}

impl AsyncRead for ChildStderr {
fn poll_read(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}

Expand Down
55 changes: 49 additions & 6 deletions tokio/src/process/unix/mod.rs
Expand Up @@ -29,7 +29,7 @@ use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
mod reap;
use reap::Reaper;

use crate::io::PollEvented;
use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf};
use crate::process::kill::Kill;
use crate::process::SpawnedChild;
use crate::signal::unix::driver::Handle as SignalHandle;
Expand Down Expand Up @@ -177,8 +177,8 @@ impl AsRawFd for Pipe {
}
}

pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> {
let mut fd = io.into_inner()?.fd;
pub(crate) fn convert_to_stdio(io: ChildStdio) -> io::Result<Stdio> {
let mut fd = io.inner.into_inner()?.fd;

// Ensure that the fd to be inherited is set to *blocking* mode, as this
// is the default that virtually all programs expect to have. Those
Expand Down Expand Up @@ -213,7 +213,50 @@ impl Source for Pipe {
}
}

pub(crate) type ChildStdio = PollEvented<Pipe>;
pub(crate) struct ChildStdio {
inner: PollEvented<Pipe>,
}

impl fmt::Debug for ChildStdio {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(fmt)
}
}

impl AsRawFd for ChildStdio {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}

impl AsyncWrite for ChildStdio {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.inner.poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

impl AsyncRead for ChildStdio {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
}
}

fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> {
unsafe {
Expand All @@ -238,13 +281,13 @@ fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()>
Ok(())
}

pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<Pipe>>
pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio>
where
T: IntoRawFd,
{
// Set the fd to nonblocking before we pass it to the event loop
let mut pipe = Pipe::from(io);
set_nonblocking(&mut pipe, true)?;

PollEvented::new(pipe)
PollEvented::new(pipe).map(|inner| ChildStdio { inner })
}
99 changes: 84 additions & 15 deletions tokio/src/process/windows.rs
Expand Up @@ -15,22 +15,22 @@
//! `RegisterWaitForSingleObject` and then wait on the other end of the oneshot
//! from then on out.

use crate::io::PollEvented;
use crate::io::{blocking::Blocking, AsyncRead, AsyncWrite, ReadBuf};
use crate::process::kill::Kill;
use crate::process::SpawnedChild;
use crate::sync::oneshot;

use mio::windows::NamedPipe;
use std::fmt;
use std::fs::File as StdFile;
use std::future::Future;
use std::io;
use std::os::windows::prelude::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
use std::os::windows::prelude::{AsRawHandle, IntoRawHandle, RawHandle};
use std::pin::Pin;
use std::process::Stdio;
use std::process::{Child as StdChild, Command as StdCommand, ExitStatus};
use std::ptr;
use std::task::Context;
use std::task::Poll;
use std::sync::Arc;
use std::task::{Context, Poll};
use winapi::shared::minwindef::{DWORD, FALSE};
use winapi::um::handleapi::{DuplicateHandle, INVALID_HANDLE_VALUE};
use winapi::um::processthreadsapi::GetCurrentProcess;
Expand Down Expand Up @@ -167,28 +167,97 @@ unsafe extern "system" fn callback(ptr: PVOID, _timer_fired: BOOLEAN) {
let _ = complete.take().unwrap().send(());
}

pub(crate) type ChildStdio = PollEvented<NamedPipe>;
#[derive(Debug)]
struct ArcFile(Arc<StdFile>);

pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<NamedPipe>>
impl io::Read for ArcFile {
fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
(&*self.0).read(bytes)
}
}

impl io::Write for ArcFile {
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
(&*self.0).write(bytes)
}

fn flush(&mut self) -> io::Result<()> {
(&*self.0).flush()
}
}

#[derive(Debug)]
pub(crate) struct ChildStdio {
// Used for accessing the raw handle, even if the io version is busy
raw: Arc<StdFile>,
// For doing I/O operations asynchronously
io: Blocking<ArcFile>,
}
Comment on lines +189 to +195
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like you could drop the Arc stuff if you just stored the raw handle next to the io field.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::process::Stdio wants ownership of the handle (and will close it on drop) which might cause issues if the handle is still being written to in the background. Using the Arc allows us to potentially avoid a syscall to duplicate the handle if it doesn't have any pending I/O on it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could implement a try_unwrap on the Blocking type to get the same behavior without the Arc. But I don't mind either way, so I will approve it and let you decide if you want to implement that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did consider adding try_unwrap to Blocking, the tricky bit is that it moves the handle to and from the blocking threadpool, so if there is a pending I/O operation we cannot access anything. The conversion from ChildStd* to Stdio are synchronous, so we cannot even internally .await things. Hence the Arc approach allows us to at least clone the handle if needed

I'm going to leave it as is for now since we can always revisit it later as needed. Thanks for the review!


impl AsRawHandle for ChildStdio {
fn as_raw_handle(&self) -> RawHandle {
self.raw.as_raw_handle()
}
}

impl AsyncRead for ChildStdio {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_read(cx, buf)
}
}

impl AsyncWrite for ChildStdio {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.io).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_shutdown(cx)
}
}

pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio>
where
T: IntoRawHandle,
{
let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) };
PollEvented::new(pipe)
use std::os::windows::prelude::FromRawHandle;

let raw = Arc::new(unsafe { StdFile::from_raw_handle(io.into_raw_handle()) });
let io = Blocking::new(ArcFile(raw.clone()));
Ok(ChildStdio { raw, io })
}

pub(crate) fn convert_to_stdio(child_stdio: ChildStdio) -> io::Result<Stdio> {
let ChildStdio { raw, io } = child_stdio;
drop(io); // Try to drop the Arc count here

Arc::try_unwrap(raw)
.or_else(|raw| duplicate_handle(&*raw))
.map(Stdio::from)
}

pub(crate) fn convert_to_stdio(io: PollEvented<NamedPipe>) -> io::Result<Stdio> {
let named_pipe = io.into_inner()?;
fn duplicate_handle<T: AsRawHandle>(io: &T) -> io::Result<StdFile> {
use std::os::windows::prelude::FromRawHandle;

// Mio does not implement `IntoRawHandle` for `NamedPipe`, so we'll manually
// duplicate the handle here...
unsafe {
let mut dup_handle = INVALID_HANDLE_VALUE;
let cur_proc = GetCurrentProcess();

let status = DuplicateHandle(
cur_proc,
named_pipe.as_raw_handle(),
io.as_raw_handle(),
cur_proc,
&mut dup_handle,
0 as DWORD,
Expand All @@ -200,6 +269,6 @@ pub(crate) fn convert_to_stdio(io: PollEvented<NamedPipe>) -> io::Result<Stdio>
return Err(io::Error::last_os_error());
}

Ok(Stdio::from_raw_handle(dup_handle))
Ok(StdFile::from_raw_handle(dup_handle))
}
}
6 changes: 5 additions & 1 deletion tokio/src/runtime/builder.rs
Expand Up @@ -272,7 +272,11 @@ impl Builder {
/// .unwrap();
/// ```
pub fn enable_all(&mut self) -> &mut Self {
#[cfg(any(feature = "net", feature = "process", all(unix, feature = "signal")))]
#[cfg(any(
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal")
))]
self.enable_io();
#[cfg(feature = "time")]
self.enable_time();
Expand Down