Skip to content

Commit

Permalink
control: builder cleanups from review
Browse files Browse the repository at this point in the history
* Use new url_for pattern
* State::Done => State::Success
* Terminology: "task" => "job" for background jobs of the control plane.
  • Loading branch information
jgraettinger committed Mar 11, 2022
1 parent d092cf3 commit 5093c3b
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 56 deletions.
3 changes: 0 additions & 3 deletions crates/control/src/controllers/builds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ pub async fn index(
Extension(ctx): Extension<AppContext>,
CurrentAccount(account): CurrentAccount,
) -> Result<impl IntoResponse, AppError> {
tracing::debug!(?account.id, "entered index");
let builds = builds_repo::fetch_for_account(ctx.db(), account.id).await?;

Ok((StatusCode::OK, view::index(builds)))
}

Expand All @@ -28,7 +26,6 @@ pub async fn create(
Json(catalog): Json<models::Catalog>,
) -> Result<impl IntoResponse, AppError> {
let build = builds_repo::insert(ctx.db(), catalog, account.id).await?;

Ok((StatusCode::CREATED, view::create(build)))
}

Expand Down
10 changes: 3 additions & 7 deletions crates/control/src/controllers/builds/routes.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use crate::config::settings;
use crate::controllers::url_for;
use crate::models::{builds::Build, id::Id};

pub fn index() -> String {
prefixed("/builds")
url_for("/builds")
}

pub fn show(build_id: Id<Build>) -> String {
prefixed(format!("/builds/{}", build_id.to_string()))
}

fn prefixed(path: impl Into<String>) -> String {
format!("http://{}{}", settings().application.address(), path.into())
url_for(format!("/builds/{}", build_id.to_string()))
}
4 changes: 2 additions & 2 deletions crates/control/src/models/builds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::models::id::Id;
#[serde(rename_all = "camelCase", tag = "type")]
pub enum State {
Queued,
Done,
Success,
BuildFailed { code: Option<i32> },
TestFailed { code: Option<i32> },
}
Expand All @@ -39,7 +39,7 @@ impl fmt::Debug for Build {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Build")
.field("account_id", &self.account_id)
// catalog is omitted.
.field("catalog", &"<omitted>".to_string())
.field("created_at", &self.created_at)
.field("id", &self.id)
.field("state", &self.state)
Expand Down
87 changes: 44 additions & 43 deletions crates/control/src/services/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ pub enum Error {
CreateDir(#[source] std::io::Error),
#[error("failed to create source catalog file")]
CreateSource(#[source] std::io::Error),
#[error("processing task {task:?} failed")]
Task {
task: String,
#[error("processing job {job:?} failed")]
Job {
job: String,
#[source]
err: std::io::Error,
},
Expand Down Expand Up @@ -121,9 +121,9 @@ async fn process_build(
let db = rusqlite::Connection::open(&db_path)?;

enable_wal_mode(&db)?;
create_task_logs_table(&db)?;
create_job_logs_table(&db)?;

let build_task = run_task(
let build_job = run_job(
tokio::process::Command::new("flowctl")
.arg("api")
.arg("build")
Expand All @@ -147,15 +147,15 @@ async fn process_build(
"build",
)
.await
.map_err(|err| Error::Task {
task: "build".to_string(),
.map_err(|err| Error::Job {
job: "build".to_string(),
err,
})?;

if !build_task.success() {
if !build_job.success() {
return Ok((
State::BuildFailed {
code: build_task.code(),
code: build_job.code(),
},
db_path,
));
Expand All @@ -164,9 +164,9 @@ async fn process_build(
// Start a data-plane. It will use ${tmp_dir}/builds as its builds-root,
// which we also used as the build directory, meaning the build database
// is already in-place.
let mut data_plane_task = tokio::process::Command::new("flowctl");
let data_plane_task = run_task(
data_plane_task
let mut data_plane_job = tokio::process::Command::new("flowctl");
let data_plane_job = run_job(
data_plane_job
.arg("temp-data-plane")
.arg("--network")
.arg(&crate::config::settings().application.connector_network)
Expand All @@ -182,9 +182,9 @@ async fn process_build(
.fuse();

// Start the test runner.
let mut test_task = tokio::process::Command::new("flowctl");
let test_task = run_task(
test_task
let mut test_job = tokio::process::Command::new("flowctl");
let test_job = run_job(
test_job
.arg("api")
.arg("test")
.arg("--build-id")
Expand All @@ -207,38 +207,38 @@ async fn process_build(
)
.fuse();

// Drive the data-plane and test tasks, until tests complete.
pin_mut!(data_plane_task, test_task);
let test_task = select! {
r = data_plane_task => {
// Drive the data-plane and test jobs, until tests complete.
pin_mut!(data_plane_job, test_job);
let test_job = select! {
r = data_plane_job => {
tracing::error!(?r, "test data-plane exited unexpectedly");
test_task.await // Wait for the test task to finish.
test_job.await // Wait for the test job to finish.
}
r = test_task => r,
r = test_job => r,
}
.map_err(|err| Error::Task {
task: "test".to_string(),
.map_err(|err| Error::Job {
job: "test".to_string(),
err,
})?;

if !test_task.success() {
if !test_job.success() {
return Ok((
State::TestFailed {
code: test_task.code(),
code: test_job.code(),
},
db_path,
));
}

Ok((State::Done, db_path))
Ok((State::Success, db_path))
}

// run_task spawns the provided Command, capturing its stdout and stderr
// into the provided logs database identified by |task|.
async fn run_task(
// run_job spawns the provided Command, capturing its stdout and stderr
// into the provided logs database identified by |job|.
async fn run_job(
cmd: &mut tokio::process::Command,
logs_db: &rusqlite::Connection,
task: &str,
job: &str,
) -> Result<std::process::ExitStatus, std::io::Error> {
cmd
// Pass through PATH, but remove all other environment variables.
Expand All @@ -251,8 +251,8 @@ async fn run_task(

let mut child = cmd.spawn()?;

let stdout = capture_task_logs(logs_db, task, 0, child.stdout.take().unwrap());
let stderr = capture_task_logs(logs_db, task, 1, child.stderr.take().unwrap());
let stdout = capture_job_logs(logs_db, job, 1, child.stdout.take().unwrap());
let stderr = capture_job_logs(logs_db, job, 2, child.stderr.take().unwrap());

let (_, _, exit) = futures::try_join!(stdout, stderr, child.wait())?;
Ok(exit)
Expand All @@ -269,11 +269,11 @@ fn enable_wal_mode(db: &rusqlite::Connection) -> rusqlite::Result<()> {
}
}

fn create_task_logs_table(db: &rusqlite::Connection) -> rusqlite::Result<()> {
fn create_job_logs_table(db: &rusqlite::Connection) -> rusqlite::Result<()> {
db.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS task_logs (
task TEXT NOT NULL,
CREATE TABLE IF NOT EXISTS job_logs (
job TEXT NOT NULL,
source INTEGER NOT NULL, -- 0 is stdout; 1 is stderr.
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
line TEXT NOT NULL
Expand All @@ -284,13 +284,14 @@ fn create_task_logs_table(db: &rusqlite::Connection) -> rusqlite::Result<()> {
Ok(())
}

// capture_task_logs consumes lines from the AsyncRead and adds each as a log
// entry to a well-known `task_logs` table within the given SQLite database.
// Each entry is identified by its task and source.
// By convention, stdout is source=0 and stderr is source=1.
async fn capture_task_logs<R>(
// capture_job_logs consumes lines from the AsyncRead and adds each as a log
// entry to a well-known `job_logs` table within the given SQLite database.
// Each entry is identified by its job name and source.
// By convention, stdout is source=1 and stderr is source=2.
// TODO(johnny): Consider locking down `source` with an enum.
async fn capture_job_logs<R>(
db: &rusqlite::Connection,
task: &str,
job: &str,
source: i32,
r: R,
) -> Result<(), std::io::Error>
Expand All @@ -300,8 +301,8 @@ where
let mut splits = tokio::io::BufReader::new(r).split(b'\n');
while let Some(split) = splits.next_segment().await? {
db.execute(
"INSERT INTO task_logs (task, source, line) VALUES (?, ?, ?);",
rusqlite::params![task, source, split],
"INSERT INTO job_logs (job, source, line) VALUES (?, ?, ?);",
rusqlite::params![job, source, split],
)
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/control/tests/it/builds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ async fn dequeue_builds_test() {
std::mem::drop(txn_3);

// Second transaction updates its build and commits.
update_build_state(&mut txn_2, build_2.id, State::Done)
update_build_state(&mut txn_2, build_2.id, State::Success)
.await
.unwrap();
txn_2.commit().await.unwrap();
Expand Down

0 comments on commit 5093c3b

Please sign in to comment.