Skip to content

Commit

Permalink
Optimize tokio::child::windows::ChildImp::wait (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
NobodyXu committed Nov 13, 2023
1 parent d24db44 commit 86c329e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 12 deletions.
39 changes: 27 additions & 12 deletions src/tokio/child/windows.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{io::Result, mem, process::ExitStatus};
use std::{io::Result, mem, ops::ControlFlow, process::ExitStatus};
use tokio::{
process::{Child, ChildStderr, ChildStdin, ChildStdout},
task::spawn_blocking,
Expand Down Expand Up @@ -67,15 +67,15 @@ impl ChildImp {
self.inner.id()
}

fn wait_imp(handles: JobPort, timeout: DWORD) -> Result<()> {
fn wait_imp(completion_port: ThreadSafeRawHandle, timeout: DWORD) -> Result<ControlFlow<()>> {
let mut code: DWORD = 0;
let mut key: ULONG_PTR = 0;
let mut overlapped = mem::MaybeUninit::<OVERLAPPED>::uninit();
let mut lp_overlapped = overlapped.as_mut_ptr();

let result = unsafe {
GetQueuedCompletionStatus(
handles.completion_port,
completion_port.0,
&mut code,
&mut key,
&mut lp_overlapped,
Expand All @@ -86,25 +86,40 @@ impl ChildImp {
// ignore timing out errors unless the timeout was specified to INFINITE
// https://docs.microsoft.com/en-us/windows/win32/api/ioapiset/nf-ioapiset-getqueuedcompletionstatus
if timeout != INFINITE && result == FALSE && lp_overlapped.is_null() {
return Ok(());
return Ok(ControlFlow::Continue(()));
}

res_bool(result)?;

// don't drop them
mem::forget(handles);

Ok(())
Ok(ControlFlow::Break(()))
}

pub async fn wait(&mut self) -> Result<ExitStatus> {
let handles = self.handles.clone();
spawn_blocking(|| Self::wait_imp(handles, INFINITE)).await??;
self.inner.wait().await
const MAX_RETRY_ATTEMPT: usize = 10;

// Always wait for parent to exit first.
//
// It's likely that all its children has already exited and reaped by
// the time the parent exits.
let status = self.inner.wait().await?;

let completion_port = ThreadSafeRawHandle(self.handles.completion_port);

// Try waiting for group exit, if it is still alive after several
// attempts, then spawn a blocking task to reap them.
for retry_attempt in 1..=MAX_RETRY_ATTEMPT {
if Self::wait_imp(completion_port, 0)?.is_break() {
break;
} else if retry_attempt == MAX_RETRY_ATTEMPT {
spawn_blocking(move || Self::wait_imp(completion_port, INFINITE)).await??;
}
}

Ok(status)
}

pub fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
Self::wait_imp(self.handles.clone(), 0)?;
Self::wait_imp(ThreadSafeRawHandle(self.handles.completion_port), 0)?;
self.inner.try_wait()
}
}
7 changes: 7 additions & 0 deletions src/winres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ impl Drop for JobPort {
unsafe impl Send for JobPort {}
unsafe impl Sync for JobPort {}

#[derive(Copy, Clone)]
#[repr(transparent)]
pub(crate) struct ThreadSafeRawHandle(pub HANDLE);

unsafe impl Send for ThreadSafeRawHandle {}
unsafe impl Sync for ThreadSafeRawHandle {}

pub(crate) fn res_null(handle: HANDLE) -> Result<HANDLE> {
if handle.is_null() {
Err(Error::last_os_error())
Expand Down

0 comments on commit 86c329e

Please sign in to comment.