Skip to content

Commit

Permalink
squash into "control: add builder service for building builds"
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Mar 11, 2022
1 parent 734cd83 commit c00fa97
Showing 1 changed file with 43 additions and 42 deletions.
85 changes: 43 additions & 42 deletions crates/control/src/services/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ pub enum Error {
CreateDir(#[source] std::io::Error),
#[error("failed to create source catalog file")]
CreateSource(#[source] std::io::Error),
#[error("processing task {task:?} failed")]
Task {
task: String,
#[error("processing job {job:?} failed")]
Job {
job: String,
#[source]
err: std::io::Error,
},
Expand Down Expand Up @@ -121,9 +121,9 @@ async fn process_build(
let db = rusqlite::Connection::open(&db_path)?;

enable_wal_mode(&db)?;
create_task_logs_table(&db)?;
create_job_logs_table(&db)?;

let build_task = run_task(
let build_job = run_job(
tokio::process::Command::new("flowctl")
.arg("api")
.arg("build")
Expand All @@ -147,15 +147,15 @@ async fn process_build(
"build",
)
.await
.map_err(|err| Error::Task {
task: "build".to_string(),
.map_err(|err| Error::Job {
job: "build".to_string(),
err,
})?;

if !build_task.success() {
if !build_job.success() {
return Ok((
State::BuildFailed {
code: build_task.code(),
code: build_job.code(),
},
db_path,
));
Expand All @@ -164,9 +164,9 @@ async fn process_build(
// Start a data-plane. It will use ${tmp_dir}/builds as its builds-root,
// which we also used as the build directory, meaning the build database
// is already in-place.
let mut data_plane_task = tokio::process::Command::new("flowctl");
let data_plane_task = run_task(
data_plane_task
let mut data_plane_job = tokio::process::Command::new("flowctl");
let data_plane_job = run_job(
data_plane_job
.arg("temp-data-plane")
.arg("--network")
.arg(&crate::config::settings().application.connector_network)
Expand All @@ -182,9 +182,9 @@ async fn process_build(
.fuse();

// Start the test runner.
let mut test_task = tokio::process::Command::new("flowctl");
let test_task = run_task(
test_task
let mut test_job = tokio::process::Command::new("flowctl");
let test_job = run_job(
test_job
.arg("api")
.arg("test")
.arg("--build-id")
Expand All @@ -207,24 +207,24 @@ async fn process_build(
)
.fuse();

// Drive the data-plane and test tasks, until tests complete.
pin_mut!(data_plane_task, test_task);
let test_task = select! {
r = data_plane_task => {
// Drive the data-plane and test jobs, until tests complete.
pin_mut!(data_plane_job, test_job);
let test_job = select! {
r = data_plane_job => {
tracing::error!(?r, "test data-plane exited unexpectedly");
test_task.await // Wait for the test task to finish.
test_job.await // Wait for the test job to finish.
}
r = test_task => r,
r = test_job => r,
}
.map_err(|err| Error::Task {
task: "test".to_string(),
.map_err(|err| Error::Job {
job: "test".to_string(),
err,
})?;

if !test_task.success() {
if !test_job.success() {
return Ok((
State::TestFailed {
code: test_task.code(),
code: test_job.code(),
},
db_path,
));
Expand All @@ -233,12 +233,12 @@ async fn process_build(
Ok((State::Success, db_path))
}

// run_task spawns the provided Command, capturing its stdout and stderr
// into the provided logs database identified by |task|.
async fn run_task(
// run_job spawns the provided Command, capturing its stdout and stderr
// into the provided logs database identified by |job|.
async fn run_job(
cmd: &mut tokio::process::Command,
logs_db: &rusqlite::Connection,
task: &str,
job: &str,
) -> Result<std::process::ExitStatus, std::io::Error> {
cmd
// Pass through PATH, but remove all other environment variables.
Expand All @@ -251,8 +251,8 @@ async fn run_task(

let mut child = cmd.spawn()?;

let stdout = capture_task_logs(logs_db, task, 0, child.stdout.take().unwrap());
let stderr = capture_task_logs(logs_db, task, 1, child.stderr.take().unwrap());
let stdout = capture_job_logs(logs_db, job, 1, child.stdout.take().unwrap());
let stderr = capture_job_logs(logs_db, job, 2, child.stderr.take().unwrap());

let (_, _, exit) = futures::try_join!(stdout, stderr, child.wait())?;
Ok(exit)
Expand All @@ -269,11 +269,11 @@ fn enable_wal_mode(db: &rusqlite::Connection) -> rusqlite::Result<()> {
}
}

fn create_task_logs_table(db: &rusqlite::Connection) -> rusqlite::Result<()> {
fn create_job_logs_table(db: &rusqlite::Connection) -> rusqlite::Result<()> {
db.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS task_logs (
task TEXT NOT NULL,
CREATE TABLE IF NOT EXISTS job_logs (
job TEXT NOT NULL,
source INTEGER NOT NULL, -- 0 is stdout; 1 is stderr.
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
line TEXT NOT NULL
Expand All @@ -284,13 +284,14 @@ fn create_task_logs_table(db: &rusqlite::Connection) -> rusqlite::Result<()> {
Ok(())
}

// capture_task_logs consumes lines from the AsyncRead and adds each as a log
// entry to a well-known `task_logs` table within the given SQLite database.
// Each entry is identified by its task and source.
// By convention, stdout is source=0 and stderr is source=1.
async fn capture_task_logs<R>(
// capture_job_logs consumes lines from the AsyncRead and adds each as a log
// entry to a well-known `job_logs` table within the given SQLite database.
// Each entry is identified by its job name and source.
// By convention, stdout is source=1 and stderr is source=2.
// TODO(johnny): Consider locking down `source` with an enum.
async fn capture_job_logs<R>(
db: &rusqlite::Connection,
task: &str,
job: &str,
source: i32,
r: R,
) -> Result<(), std::io::Error>
Expand All @@ -300,8 +301,8 @@ where
let mut splits = tokio::io::BufReader::new(r).split(b'\n');
while let Some(split) = splits.next_segment().await? {
db.execute(
"INSERT INTO task_logs (task, source, line) VALUES (?, ?, ?);",
rusqlite::params![task, source, split],
"INSERT INTO job_logs (job, source, line) VALUES (?, ?, ?);",
rusqlite::params![job, source, split],
)
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
}
Expand Down

0 comments on commit c00fa97

Please sign in to comment.