Skip to content

Commit

Permalink
Optimize inherited_jobserver acquire
Browse files Browse the repository at this point in the history
First try `jobserver::Client::try_acquire`, which will work:
 - If a fifo is used as jobserver
 - On linux and:
    - preadv2 with non-blocking read available (>=5.6)
    - /proc is available
 - On Windows
 - On wasm

if not, we will simply fallback to help thread implementation, spawning one
thread to maintain compatibility with other platforms.

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
  • Loading branch information
NobodyXu committed Apr 18, 2024
1 parent 173e649 commit e9ce514
Showing 1 changed file with 47 additions and 24 deletions.
71 changes: 47 additions & 24 deletions src/parallel/job_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ impl ActiveJobTokenServer {
mod inherited_jobserver {
use super::JobToken;

use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind};
use crate::{
parallel::{async_executor::YieldOnce, once_lock::OnceLock},
Error, ErrorKind,
};

use std::{
io, mem,
Expand Down Expand Up @@ -140,30 +143,38 @@ mod inherited_jobserver {
}

pub(super) fn enter_active(&self) -> Result<ActiveJobServer<'_>, Error> {
ActiveJobServer::new(self)
Ok(ActiveJobServer {
jobserver: self,
helper_thread: OnceLock::new(),
})
}
}

pub(crate) struct ActiveJobServer<'a> {
jobserver: &'a JobServer,
helper_thread: jobserver::HelperThread,
struct HelperThread {
inner: jobserver::HelperThread,
/// When rx is dropped, all the token stored within it will be dropped.
rx: mpsc::Receiver<io::Result<jobserver::Acquired>>,
}

impl<'a> ActiveJobServer<'a> {
fn new(jobserver: &'a JobServer) -> Result<Self, Error> {
impl HelperThread {
fn new(jobserver: &JobServer) -> Result<Self, Error> {
let (tx, rx) = mpsc::channel();

Ok(Self {
rx,
helper_thread: jobserver.inner.clone().into_helper_thread(move |res| {
inner: jobserver.inner.clone().into_helper_thread(move |res| {
let _ = tx.send(res);
})?,
jobserver,
})
}
}

pub(crate) struct ActiveJobServer<'a> {
jobserver: &'a JobServer,
helper_thread: OnceLock<HelperThread>,
}

impl<'a> ActiveJobServer<'a> {
pub(super) async fn acquire(&self) -> Result<JobToken, Error> {
let mut has_requested_token = false;

Expand All @@ -173,26 +184,38 @@ mod inherited_jobserver {
break Ok(JobToken::new());
}

// Cold path, no global implicit token, obtain one
match self.rx.try_recv() {
Ok(res) => {
let acquired = res?;
match self.jobserver.inner.try_acquire() {
Ok(Some(acquired)) => {
acquired.drop_without_releasing();
break Ok(JobToken::new());
}
Err(mpsc::TryRecvError::Disconnected) => {
break Err(Error::new(
ErrorKind::JobserverHelpThreadError,
"jobserver help thread has returned before ActiveJobServer is dropped",
))
}
Err(mpsc::TryRecvError::Empty) => {
if !has_requested_token {
self.helper_thread.request_token();
has_requested_token = true;
Ok(None) => YieldOnce::default().await,
Err(err) if err.kind() == io::ErrorKind::Unsupported => {
// Fallback to creating a help thread with blocking acquire
let helper_thread = self
.helper_thread
.get_or_try_init(|| HelperThread::new(&self.jobserver))?;

match helper_thread.rx.try_recv() {
Ok(res) => {
let acquired = res?;
acquired.drop_without_releasing();
break Ok(JobToken::new());
}
Err(mpsc::TryRecvError::Disconnected) => break Err(Error::new(
ErrorKind::JobserverHelpThreadError,
"jobserver help thread has returned before ActiveJobServer is dropped",
)),
Err(mpsc::TryRecvError::Empty) => {
if !has_requested_token {
helper_thread.inner.request_token();
has_requested_token = true;
}
YieldOnce::default().await
}
}
YieldOnce::default().await
}
Err(err) => break Err(err.into()),
}
}
}
Expand Down

0 comments on commit e9ce514

Please sign in to comment.