diff --git a/Cargo.toml b/Cargo.toml index bcbfffe5..bb9cfc98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,8 @@ edition = "2018" rust-version = "1.63" [dependencies] -jobserver = { version = "0.1.20", default-features = false, optional = true } +jobserver = { version = "0.1.30", default-features = false, optional = true } +once_cell = { version = "1.19", optional = true } [target.'cfg(unix)'.dependencies] # Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866 @@ -28,7 +29,7 @@ jobserver = { version = "0.1.20", default-features = false, optional = true } libc = { version = "0.2.62", default-features = false, optional = true } [features] -parallel = ["libc", "jobserver"] +parallel = ["libc", "jobserver", "once_cell"] [dev-dependencies] tempfile = "3" diff --git a/src/lib.rs b/src/lib.rs index 4df12680..1830b9da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1449,7 +1449,7 @@ impl Build { } // Limit our parallelism globally with a jobserver. - let tokens = parallel::job_token::ActiveJobTokenServer::new()?; + let tokens = parallel::job_token::ActiveJobTokenServer::new(); // When compiling objects in parallel we do a few dirty tricks to speed // things up: diff --git a/src/parallel/job_token.rs b/src/parallel/job_token.rs index 17b2d47d..31dbfe83 100644 --- a/src/parallel/job_token.rs +++ b/src/parallel/job_token.rs @@ -1,7 +1,9 @@ -use std::{marker::PhantomData, mem::MaybeUninit, sync::Once}; +use std::marker::PhantomData; use crate::Error; +use once_cell::sync::OnceCell; + pub(crate) struct JobToken(PhantomData<()>); impl JobToken { @@ -35,18 +37,13 @@ impl JobTokenServer { /// compilation. fn new() -> &'static Self { // TODO: Replace with a OnceLock once MSRV is 1.70 - static INIT: Once = Once::new(); - static mut JOBSERVER: MaybeUninit = MaybeUninit::uninit(); - - unsafe { - INIT.call_once(|| { - let server = inherited_jobserver::JobServer::from_env() - .map(Self::Inherited) - .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new())); - JOBSERVER.write(server); - }); - JOBSERVER.assume_init_ref() - } + static JOBSERVER: OnceCell = OnceCell::new(); + + JOBSERVER.get_or_init(|| { + unsafe { inherited_jobserver::JobServer::from_env() } + .map(Self::Inherited) + .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new())) + }) } } @@ -56,14 +53,12 @@ pub(crate) enum ActiveJobTokenServer { } impl ActiveJobTokenServer { - pub(crate) fn new() -> Result { + pub(crate) fn new() -> Self { match JobTokenServer::new() { JobTokenServer::Inherited(inherited_jobserver) => { - inherited_jobserver.enter_active().map(Self::Inherited) - } - JobTokenServer::InProcess(inprocess_jobserver) => { - Ok(Self::InProcess(inprocess_jobserver)) + Self::Inherited(inherited_jobserver.enter_active()) } + JobTokenServer::InProcess(inprocess_jobserver) => Self::InProcess(inprocess_jobserver), } } @@ -76,7 +71,7 @@ impl ActiveJobTokenServer { } mod inherited_jobserver { - use super::JobToken; + use super::{JobToken, OnceCell}; use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind}; @@ -139,31 +134,39 @@ mod inherited_jobserver { } } - pub(super) fn enter_active(&self) -> Result, Error> { - ActiveJobServer::new(self) + pub(super) fn enter_active(&self) -> ActiveJobServer<'_> { + ActiveJobServer { + jobserver: self, + helper_thread: OnceCell::new(), + } } } - pub(crate) struct ActiveJobServer<'a> { - jobserver: &'a JobServer, - helper_thread: jobserver::HelperThread, + struct HelperThread { + inner: jobserver::HelperThread, /// When rx is dropped, all the token stored within it will be dropped. rx: mpsc::Receiver>, } - impl<'a> ActiveJobServer<'a> { - fn new(jobserver: &'a JobServer) -> Result { + impl HelperThread { + fn new(jobserver: &JobServer) -> Result { let (tx, rx) = mpsc::channel(); Ok(Self { rx, - helper_thread: jobserver.inner.clone().into_helper_thread(move |res| { + inner: jobserver.inner.clone().into_helper_thread(move |res| { let _ = tx.send(res); })?, - jobserver, }) } + } + + pub(crate) struct ActiveJobServer<'a> { + jobserver: &'a JobServer, + helper_thread: OnceCell, + } + impl<'a> ActiveJobServer<'a> { pub(super) async fn acquire(&self) -> Result { let mut has_requested_token = false; @@ -173,26 +176,38 @@ mod inherited_jobserver { break Ok(JobToken::new()); } - // Cold path, no global implicit token, obtain one - match self.rx.try_recv() { - Ok(res) => { - let acquired = res?; + match self.jobserver.inner.try_acquire() { + Ok(Some(acquired)) => { acquired.drop_without_releasing(); break Ok(JobToken::new()); } - Err(mpsc::TryRecvError::Disconnected) => { - break Err(Error::new( - ErrorKind::JobserverHelpThreadError, - "jobserver help thread has returned before ActiveJobServer is dropped", - )) - } - Err(mpsc::TryRecvError::Empty) => { - if !has_requested_token { - self.helper_thread.request_token(); - has_requested_token = true; + Ok(None) => YieldOnce::default().await, + Err(err) if err.kind() == io::ErrorKind::Unsupported => { + // Fallback to creating a help thread with blocking acquire + let helper_thread = self + .helper_thread + .get_or_try_init(|| HelperThread::new(&self.jobserver))?; + + match helper_thread.rx.try_recv() { + Ok(res) => { + let acquired = res?; + acquired.drop_without_releasing(); + break Ok(JobToken::new()); + } + Err(mpsc::TryRecvError::Disconnected) => break Err(Error::new( + ErrorKind::JobserverHelpThreadError, + "jobserver help thread has returned before ActiveJobServer is dropped", + )), + Err(mpsc::TryRecvError::Empty) => { + if !has_requested_token { + helper_thread.inner.request_token(); + has_requested_token = true; + } + YieldOnce::default().await + } } - YieldOnce::default().await } + Err(err) => break Err(err.into()), } } }