Skip to content

Commit

Permalink
Build each node individually
Browse files Browse the repository at this point in the history
Now nodes that take a long time to build won't bottleneck the
deployment of other nodes in the same chunk.

Fixes #47.
  • Loading branch information
zhaofengli committed Dec 8, 2021
1 parent ea09e60 commit eebded1
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 197 deletions.
24 changes: 1 addition & 23 deletions src/job.rs
Expand Up @@ -13,7 +13,7 @@ use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::time;
use uuid::Uuid;

use crate::nix::{NixResult, NixError, NodeName, ProfileMap};
use crate::nix::{NixResult, NixError, NodeName};
use crate::progress::{Sender as ProgressSender, Message as ProgressMessage, Line, LineStyle};

pub type Sender = UnboundedSender<Event>;
Expand Down Expand Up @@ -208,9 +208,6 @@ pub enum EventPayload {
/// The job wants to transition to a new state.
NewState(JobState),

/// The job built a set of system profiles.
ProfilesBuilt(ProfileMap),

/// The child process printed a line to stdout.
ChildStdout(String),

Expand Down Expand Up @@ -348,19 +345,6 @@ impl JobMonitor {
self.print_job_stats();
}
}
EventPayload::ProfilesBuilt(profiles) => {
if let Some(sender) = &self.progress {
for (name, profile) in profiles.iter() {
let text = format!("Built {:?}", profile.as_path());
let line = Line::new(message.job_id, text)
.label(name.as_str().to_string())
.one_off()
.style(LineStyle::Success);
let pm = self.get_print_message(message.job_id, line);
sender.send(pm).unwrap();
}
}
}
EventPayload::ChildStdout(m) | EventPayload::ChildStderr(m) | EventPayload::Message(m) => {
if let Some(sender) = &self.progress {
let metadata = &self.jobs[&message.job_id];
Expand Down Expand Up @@ -598,11 +582,6 @@ impl JobHandleInner {
self.send_payload(EventPayload::Failure(error.to_string()))
}

/// Sends a set of built profiles.
pub fn profiles_built(&self, profiles: ProfileMap) -> NixResult<()> {
self.send_payload(EventPayload::ProfilesBuilt(profiles))
}

/// Runs a closure, automatically updating the job monitor based on the result.
async fn run_internal<F, U, T>(self: Arc<Self>, f: U, report_running: bool) -> NixResult<T>
where U: FnOnce(Arc<Self>) -> F,
Expand Down Expand Up @@ -788,7 +767,6 @@ impl Display for EventPayload {
EventPayload::Noop(m) => write!(f, " noop) {}", m)?,
EventPayload::Failure(e) => write!(f, " failure) {}", e)?,
EventPayload::ShutdownMonitor => write!(f, "shutdown)")?,
EventPayload::ProfilesBuilt(pm) => write!(f, " built) {:?}", pm)?,
}

Ok(())
Expand Down
4 changes: 0 additions & 4 deletions src/nix/deployment/limits.rs
Expand Up @@ -14,9 +14,6 @@ pub struct ParallelismLimit {
/// Limit of concurrent evaluation processes.
pub evaluation: Semaphore,

/// Limit of concurrent build processes.
pub build: Semaphore,

/// Limit of concurrent apply processes.
pub apply: Semaphore,
}
Expand All @@ -25,7 +22,6 @@ impl Default for ParallelismLimit {
fn default() -> Self {
Self {
evaluation: Semaphore::new(1),
build: Semaphore::new(2),
apply: Semaphore::new(10),
}
}
Expand Down
115 changes: 53 additions & 62 deletions src/nix/deployment/mod.rs
Expand Up @@ -29,8 +29,7 @@ use super::{
NixError,
NixResult,
Profile,
ProfileMap,
StoreDerivation,
ProfileDerivation,
CopyDirection,
key::{Key, UploadAt as UploadKeyAt},
};
Expand All @@ -54,6 +53,9 @@ pub struct Deployment {
/// Deployment options.
options: Options,

/// Options passed to Nix invocations.
nix_options: Vec<String>,

/// Handle to send messages to the ProgressOutput.
progress: Option<ProgressSender>,

Expand Down Expand Up @@ -102,12 +104,13 @@ impl Deployment {
Self {
hive,
goal,
options: Options::default(),
nix_options: Vec::new(),
progress,
nodes: targets.keys().cloned().collect(),
targets,
parallelism_limit: ParallelismLimit::default(),
evaluation_node_limit: EvaluationNodeLimit::default(),
options: Options::default(),
executed: false,
}
}
Expand All @@ -129,6 +132,9 @@ impl Deployment {
monitor.set_label_width(width);
}

let nix_options = self.hive.nix_options().await?;
self.nix_options = nix_options;

if self.goal == Goal::UploadKeys {
// Just upload keys
let targets = mem::take(&mut self.targets);
Expand Down Expand Up @@ -218,45 +224,24 @@ impl Deployment {
}

let nodes: Vec<NodeName> = chunk.keys().cloned().collect();
let profiles = self.clone().build_nodes(parent.clone(), nodes.clone()).await?;

if self.goal == Goal::Build {
return Ok(());
}
let profile_drvs = self.clone().evaluate_nodes(parent.clone(), nodes.clone()).await?;

let mut futures = Vec::new();

for (name, profile) in profiles.iter() {
for (name, profile_drv) in profile_drvs.iter() {
let target = chunk.remove(name).unwrap();
futures.push(self.clone().deploy_node(parent.clone(), target, profile.clone()));
futures.push(self.clone().deploy_node(parent.clone(), target, profile_drv.clone()));
}

join_all(futures).await
.into_iter().collect::<NixResult<Vec<()>>>()?;

// Create GC root
if self.options.create_gc_roots {
let job = parent.create_job(JobType::CreateGcRoots, nodes.clone())?;
let arc_self = self.clone();
job.run_waiting(|job| async move {
if let Some(dir) = arc_self.hive.context_dir() {
job.state(JobState::Running)?;
let base = dir.join(".gcroots");

profiles.create_gc_roots(&base).await?;
} else {
job.noop("No context directory to create GC roots in".to_string())?;
}
Ok(())
}).await?;
}

Ok(())
}

/// Evaluates a set of nodes, returning a store derivation.
/// Evaluates a set of nodes, returning their corresponding store derivations.
async fn evaluate_nodes(self: DeploymentHandle, parent: JobHandle, nodes: Vec<NodeName>)
-> NixResult<StoreDerivation<ProfileMap>>
-> NixResult<HashMap<NodeName, ProfileDerivation>>
{
let job = parent.create_job(JobType::Evaluate, nodes.clone())?;

Expand All @@ -272,33 +257,6 @@ impl Deployment {
}).await
}

/// Builds a set of nodes, returning a set of profiles.
async fn build_nodes(self: DeploymentHandle, parent: JobHandle, nodes: Vec<NodeName>)
-> NixResult<ProfileMap>
{
let job = parent.create_job(JobType::Build, nodes.clone())?;

job.run_waiting(|job| async move {
let derivation = self.clone().evaluate_nodes(job.clone(), nodes.clone()).await?;

// Wait for build limit
let permit = self.parallelism_limit.apply.acquire().await.unwrap();
job.state(JobState::Running)?;

// FIXME: Remote builder?
let nix_options = self.hive.nix_options().await.unwrap();
let mut builder = host::local(nix_options);
builder.set_job(Some(job.clone()));

let map = derivation.realize(&mut *builder).await?;

job.profiles_built(map.clone())?;

drop(permit);
Ok(map)
}).await
}

/// Only uploads keys to a node.
async fn upload_keys_to_node(self: DeploymentHandle, parent: JobHandle, mut target: TargetNode) -> NixResult<()> {
let nodes = vec![target.name.clone()];
Expand All @@ -315,20 +273,36 @@ impl Deployment {
}).await
}

/// Pushes and optionally activates a system profile on a given node.
/// Builds, pushes, and optionally activates a system profile on a node.
///
/// This will also upload keys to the node.
async fn deploy_node(self: DeploymentHandle, parent: JobHandle, mut target: TargetNode, profile: Profile)
async fn deploy_node(self: DeploymentHandle, parent: JobHandle, mut target: TargetNode, profile_drv: ProfileDerivation)
-> NixResult<()>
{
if self.goal == Goal::Build {
unreachable!();
}

let nodes = vec![target.name.clone()];
let target_name = target.name.clone();

let permit = self.parallelism_limit.apply.acquire().await.unwrap();

// Build system profile
let build_job = parent.create_job(JobType::Build, nodes.clone())?;
let arc_self = self.clone();
let profile: Profile = build_job.run(|job| async move {
// FIXME: Remote builder?
let mut builder = host::local(arc_self.nix_options.clone());
builder.set_job(Some(job.clone()));

let profile = profile_drv.realize(&mut *builder).await?;

job.success_with_message(format!("Built {:?}", profile.as_path()))?;
Ok(profile)
}).await?;

if self.goal == Goal::Build {
return Ok(());
}

// Push closure to remote
let push_job = parent.create_job(JobType::Push, nodes.clone())?;
let push_profile = profile.clone();
let arc_self = self.clone();
Expand Down Expand Up @@ -437,6 +411,23 @@ impl Deployment {
}).await?;
}

// Create GC root
if self.options.create_gc_roots {
let job = parent.create_job(JobType::CreateGcRoots, nodes.clone())?;
let arc_self = self.clone();
job.run_waiting(|job| async move {
if let Some(dir) = arc_self.hive.context_dir() {
job.state(JobState::Running)?;
let path = dir.join(".gcroots").join(format!("node-{}", &*target_name));

profile.create_gc_root(&path).await?;
} else {
job.noop("No context directory to create GC roots in".to_string())?;
}
Ok(())
}).await?;
}

drop(permit);

Ok(())
Expand Down
20 changes: 6 additions & 14 deletions src/nix/hive/eval.nix
Expand Up @@ -460,19 +460,11 @@ let
deploymentConfigJsonSelected = names: toJSON
(listToAttrs (map (name: { inherit name; value = nodes.${name}.config.deployment; }) names));

buildAll = buildSelected nodeNames;
buildSelected = names: let
# Change in the order of the names should not cause a derivation to be created
selected = lib.attrsets.filterAttrs (name: _: elem name names) toplevel;
in derivation rec {
name = "colmena-${hive.meta.name}";
system = currentSystem;
json = toJSON (lib.attrsets.mapAttrs (k: v: toString v) selected);
builder = pkgs.writeScript "${name}.sh" ''
#!/bin/sh
echo "$json" > $out
'';
};
evalAll = evalSelected nodeNames;
evalSelected = names: let
selected = lib.filterAttrs (name: _: elem name names) toplevel;
drvs = lib.mapAttrs (k: v: v.drvPath) selected;
in drvs;

introspect = function: function {
inherit pkgs lib nodes;
Expand All @@ -481,7 +473,7 @@ in {
inherit
nodes toplevel
deploymentConfigJson deploymentConfigJsonSelected
buildAll buildSelected introspect;
evalAll evalSelected introspect;

meta = hive.meta;

Expand Down
32 changes: 19 additions & 13 deletions src/nix/hive/mod.rs
Expand Up @@ -11,12 +11,12 @@ use validator::Validate;

use super::{
Flake,
StoreDerivation,
NixResult,
NodeName,
NodeConfig,
NodeFilter,
ProfileMap,
ProfileDerivation,
StorePath,
};
use super::deployment::TargetNode;
use super::NixCommand;
Expand Down Expand Up @@ -250,20 +250,24 @@ impl Hive {
/// Evaluation may take up a lot of memory, so we make it possible
/// to split up the evaluation process into chunks and run them
/// concurrently with other processes (e.g., build and apply).
pub async fn eval_selected(&self, nodes: &[NodeName], job: Option<JobHandle>) -> NixResult<StoreDerivation<ProfileMap>> {
pub async fn eval_selected(&self, nodes: &[NodeName], job: Option<JobHandle>) -> NixResult<HashMap<NodeName, ProfileDerivation>> {
let nodes_expr = SerializedNixExpresssion::new(nodes)?;

let expr = format!("hive.buildSelected {}", nodes_expr.expression());
let expr = format!("hive.evalSelected {}", nodes_expr.expression());

let command = self.nix_instantiate(&expr).instantiate_with_builders().await?;
let command = self.nix_instantiate(&expr)
.eval_with_builders().await?;
let mut execution = CommandExecution::new(command);
execution.set_job(job);

let path = execution.capture_store_path().await?;
let drv = path.into_derivation()
.expect("The result should be a store derivation");

Ok(drv)
execution.set_hide_stdout(true);

execution
.capture_json::<HashMap<NodeName, StorePath>>().await?
.into_iter().map(|(name, path)| {
let path = path.into_derivation()?;
Ok((name, path))
})
.collect()
}

/// Evaluates an expression using values from the configuration
Expand Down Expand Up @@ -374,8 +378,10 @@ impl<'hive> NixInstantiate<'hive> {

fn eval(self) -> Command {
let mut command = self.instantiate();
command.arg("--eval").arg("--json")
.arg("--read-write-mode"); // For cases involving IFD
command.arg("--eval").arg("--json").arg("--strict")
// Ensures the derivations are instantiated
// Required for system profile evaluation and IFD
.arg("--read-write-mode");
command
}

Expand Down
5 changes: 4 additions & 1 deletion src/nix/mod.rs
Expand Up @@ -30,7 +30,7 @@ pub mod key;
pub use key::Key;

pub mod profile;
pub use profile::{Profile, ProfileMap};
pub use profile::{Profile, ProfileDerivation};

pub mod deployment;
pub use deployment::Goal;
Expand Down Expand Up @@ -78,6 +78,9 @@ pub enum NixError {
#[snafu(display("Failed to upload keys: {}", error))]
KeyError { error: key::KeyError },

#[snafu(display("Store path {:?} is not a derivation", store_path))]
NotADerivation { store_path: StorePath },

#[snafu(display("Invalid NixOS system profile"))]
InvalidProfile,

Expand Down

0 comments on commit eebded1

Please sign in to comment.