Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

process: use blocking threadpool for child stdio I/O #4824

Merged
merged 4 commits into from Jul 23, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Expand Up @@ -69,11 +69,11 @@ process = [
"mio/net",
"signal-hook-registry",
"winapi/handleapi",
"winapi/minwindef",
"winapi/processthreadsapi",
"winapi/threadpoollegacyapiset",
"winapi/winbase",
"winapi/winnt",
"winapi/minwindef",
]
# Includes basic task execution capabilities
rt = ["once_cell"]
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/io/blocking.rs
Expand Up @@ -34,8 +34,9 @@ enum State<T> {
Busy(sys::Blocking<(io::Result<usize>, Buf, T)>),
}

cfg_io_std! {
cfg_io_blocking! {
impl<T> Blocking<T> {
#[cfg_attr(feature = "fs", allow(dead_code))]
pub(crate) fn new(inner: T) -> Blocking<T> {
Blocking {
inner: Some(inner),
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/poll_evented.rs
Expand Up @@ -136,7 +136,7 @@ impl<E: Source> PollEvented<E> {
}

feature! {
#![any(feature = "net", feature = "process")]
#![any(feature = "net", all(unix, feature = "process"))]
ipetkov marked this conversation as resolved.
Show resolved Hide resolved

use crate::io::ReadBuf;
use std::task::{Context, Poll};
Expand Down
7 changes: 6 additions & 1 deletion tokio/src/lib.rs
Expand Up @@ -431,7 +431,12 @@ cfg_process! {
pub mod process;
}

#[cfg(any(feature = "net", feature = "fs", feature = "io-std"))]
#[cfg(any(
feature = "fs",
feature = "io-std",
feature = "net",
all(windows, feature = "process"),
))]
mod blocking;

cfg_rt! {
Expand Down
14 changes: 9 additions & 5 deletions tokio/src/macros/cfg.rs
Expand Up @@ -70,7 +70,11 @@ macro_rules! cfg_fs {

macro_rules! cfg_io_blocking {
($($item:item)*) => {
$( #[cfg(any(feature = "io-std", feature = "fs"))] $item )*
$( #[cfg(any(
feature = "io-std",
feature = "fs",
all(windows, feature = "process"),
))] $item )*
}
}

Expand All @@ -79,12 +83,12 @@ macro_rules! cfg_io_driver {
$(
#[cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
#[cfg_attr(docsrs, doc(cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))))]
$item
Expand All @@ -97,7 +101,7 @@ macro_rules! cfg_io_driver_impl {
$(
#[cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
$item
Expand All @@ -110,7 +114,7 @@ macro_rules! cfg_not_io_driver {
$(
#[cfg(not(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
)))]
$item
Expand Down
68 changes: 38 additions & 30 deletions tokio/src/process/mod.rs
Expand Up @@ -245,6 +245,7 @@ mod kill;
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::process::kill::Kill;

use pin_project_lite::pin_project;
use std::convert::TryInto;
use std::ffi::OsStr;
use std::future::Future;
Expand Down Expand Up @@ -1211,31 +1212,40 @@ impl Child {
}
}

/// The standard input stream for spawned children.
///
/// This type implements the `AsyncWrite` trait to pass data to the stdin handle of
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStdin {
inner: imp::ChildStdio,
pin_project! {
/// The standard input stream for spawned children.
///
/// This type implements the `AsyncWrite` trait to pass data to the stdin handle of
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStdin {
#[pin]
ipetkov marked this conversation as resolved.
Show resolved Hide resolved
inner: imp::ChildStdio,
}
}

/// The standard output stream for spawned children.
///
/// This type implements the `AsyncRead` trait to read data from the stdout
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStdout {
inner: imp::ChildStdio,
pin_project! {
/// The standard output stream for spawned children.
///
/// This type implements the `AsyncRead` trait to read data from the stdout
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStdout {
#[pin]
inner: imp::ChildStdio,
}
}

/// The standard error stream for spawned children.
///
/// This type implements the `AsyncRead` trait to read data from the stderr
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStderr {
inner: imp::ChildStdio,
pin_project! {
/// The standard error stream for spawned children.
///
/// This type implements the `AsyncRead` trait to read data from the stderr
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStderr {
#[pin]
inner: imp::ChildStdio,
}
}

impl ChildStdin {
Expand Down Expand Up @@ -1289,15 +1299,15 @@ impl AsyncWrite for ChildStdin {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.inner.poll_write(cx, buf)
self.project().inner.poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_shutdown(cx)
}
}

Expand All @@ -1307,8 +1317,7 @@ impl AsyncRead for ChildStdout {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
self.project().inner.poll_read(cx, buf)
}
}

Expand All @@ -1318,8 +1327,7 @@ impl AsyncRead for ChildStderr {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
self.project().inner.poll_read(cx, buf)
}
}

Expand Down
55 changes: 49 additions & 6 deletions tokio/src/process/unix/mod.rs
Expand Up @@ -29,7 +29,7 @@ use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
mod reap;
use reap::Reaper;

use crate::io::PollEvented;
use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf};
use crate::process::kill::Kill;
use crate::process::SpawnedChild;
use crate::signal::unix::driver::Handle as SignalHandle;
Expand Down Expand Up @@ -177,8 +177,8 @@ impl AsRawFd for Pipe {
}
}

pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> {
let mut fd = io.into_inner()?.fd;
pub(crate) fn convert_to_stdio(io: ChildStdio) -> io::Result<Stdio> {
let mut fd = io.inner.into_inner()?.fd;

// Ensure that the fd to be inherited is set to *blocking* mode, as this
// is the default that virtually all programs expect to have. Those
Expand Down Expand Up @@ -213,7 +213,50 @@ impl Source for Pipe {
}
}

pub(crate) type ChildStdio = PollEvented<Pipe>;
pub(crate) struct ChildStdio {
inner: PollEvented<Pipe>,
}

impl fmt::Debug for ChildStdio {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(fmt)
}
}

impl AsRawFd for ChildStdio {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}

impl AsyncWrite for ChildStdio {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.inner.poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

impl AsyncRead for ChildStdio {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
}
}

fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> {
unsafe {
Expand All @@ -238,13 +281,13 @@ fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()>
Ok(())
}

pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<Pipe>>
pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio>
where
T: IntoRawFd,
{
// Set the fd to nonblocking before we pass it to the event loop
let mut pipe = Pipe::from(io);
set_nonblocking(&mut pipe, true)?;

PollEvented::new(pipe)
PollEvented::new(pipe).map(|inner| ChildStdio { inner })
}