-
Notifications
You must be signed in to change notification settings - Fork 45
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
control: add
builder
service for building builds
The builder daemon periodically polls the database for a build to run. On finding one, it builds it in a local temporary directory using `flowctl api build`, and then tests it using `flowctl temp-data-plane` and `flowctl api test`. This commit conceptualizes each of these activities as "tasks", where log lines of each task are directly streamed into the build database as they occur. The build DB can then be its own source-of-truth for all of its outputs during the build process. We also preserve structured log information from the build process. Looking forward, activations and other background tasks will produce similar types of logs, and we can use the same mechanism to capture / persist / query them. For now, the build daemon is directly integrated into the control plane server. We'll probably want to separate these in the future. Issue #400
- Loading branch information
1 parent
1e95f72
commit 9b1c28e
Showing
10 changed files
with
343 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,310 @@ | ||
use crate::context::AppContext; | ||
use crate::models::{ | ||
builds::{Build, State}, | ||
id::Id, | ||
}; | ||
use crate::repo::builds::{dequeue_build, update_build_state}; | ||
|
||
use futures::{pin_mut, select, FutureExt}; | ||
use std::{ | ||
io::Write, | ||
path::{Path, PathBuf}, | ||
}; | ||
use tokio::io::AsyncBufReadExt; | ||
use tracing::{debug, error, info}; | ||
|
||
#[derive(Debug, thiserror::Error)] | ||
pub enum Error { | ||
#[error("failed to create temporary build directory")] | ||
CreateDir(#[source] std::io::Error), | ||
#[error("failed to create source catalog file")] | ||
CreateSource(#[source] std::io::Error), | ||
#[error("processing task {task:?} failed")] | ||
Task { | ||
task: String, | ||
#[source] | ||
err: std::io::Error, | ||
}, | ||
#[error("failed to persist build {id}")] | ||
PersistBuild { | ||
id: Id<Build>, | ||
#[source] | ||
src: crate::services::builds_root::BuildsRootError, | ||
}, | ||
#[error("sqlite database error")] | ||
Sqlite(#[from] rusqlite::Error), | ||
#[error("control plane database error")] | ||
SqlxError(#[from] sqlx::Error), | ||
} | ||
|
||
pub async fn serve_builds<F>(ctx: AppContext, shutdown: F) -> Result<(), Error> | ||
where | ||
F: std::future::Future<Output = ()>, | ||
{ | ||
let shutdown = shutdown.fuse(); | ||
pin_mut!(shutdown); | ||
|
||
let mut backoff = std::time::Duration::ZERO; | ||
|
||
loop { | ||
let sleep = tokio::time::sleep(backoff).fuse(); | ||
pin_mut!(sleep); | ||
|
||
select! { | ||
_ = shutdown => { | ||
return Ok(()) | ||
} | ||
_ = &mut sleep => (), | ||
}; | ||
|
||
// Begin a |txn| which will scope a held advisory lock on `Build.id`. | ||
let mut txn = ctx.db().begin().await?; | ||
|
||
if let Some(build) = dequeue_build(&mut txn).await? { | ||
let Build { | ||
id, | ||
account_id, | ||
catalog, | ||
created_at, | ||
state: _, | ||
updated_at, | ||
} = build; | ||
|
||
let tmpdir = tempfile::TempDir::new().map_err(|e| Error::CreateDir(e))?; | ||
info!(%id, %account_id, tmpdir=?tmpdir.path(), %created_at, %updated_at, "processing dequeued build"); | ||
|
||
let (state, db_path) = process_build(id, catalog.unwrap().0, tmpdir.path()).await?; | ||
info!(?state, "processed build"); | ||
|
||
ctx.put_builds() | ||
.put_build(id, &db_path) | ||
.await | ||
.map_err(|src| Error::PersistBuild { id, src })?; | ||
info!(%id, "put build"); | ||
|
||
update_build_state(&mut txn, id, state).await?; | ||
txn.commit().await?; | ||
|
||
backoff = std::time::Duration::ZERO; | ||
} else { | ||
debug!("serve_builds found no build to dequeue. Sleeping..."); | ||
backoff = std::time::Duration::from_secs(5); | ||
} | ||
} | ||
} | ||
|
||
async fn process_build( | ||
id: Id<Build>, | ||
catalog: models::Catalog, | ||
tmp_dir: &Path, | ||
) -> Result<(State, PathBuf), Error> { | ||
// We perform the build under a ./builds/ subdirectory, which is a | ||
// specific sub-path expected by temp-data-plane underneath its | ||
// working temporary directory. This lets temp-data-plane use the | ||
// build database in-place. | ||
let builds_dir = tmp_dir.join("builds"); | ||
std::fs::create_dir(&builds_dir).map_err(|err| Error::CreateDir(err))?; | ||
|
||
// Write our catalog source file within the build directory. | ||
std::fs::File::create(&builds_dir.join(&format!("{}.flow.yaml", id))) | ||
.and_then(|mut f| { | ||
f.write_all( | ||
serde_json::to_string_pretty(&catalog) | ||
.expect("to always serialize a models::Catalog") | ||
.as_bytes(), | ||
) | ||
}) | ||
.map_err(|e| Error::CreateSource(e))?; | ||
|
||
let db_name = format!("{}", id); | ||
let db_path = builds_dir.join(&db_name); | ||
let db = rusqlite::Connection::open(&db_path)?; | ||
|
||
enable_wal_mode(&db)?; | ||
create_task_logs_table(&db)?; | ||
|
||
let build_task = run_task( | ||
tokio::process::Command::new("flowctl") | ||
.arg("api") | ||
.arg("build") | ||
.arg("--build-id") | ||
.arg(&db_name) | ||
.arg("--directory") | ||
.arg(&builds_dir) | ||
.arg("--fs-root") | ||
.arg(&builds_dir) | ||
.arg("--network") | ||
.arg(&crate::config::settings().application.connector_network) | ||
.arg("--source") | ||
.arg(format!("file:///{}.flow.yaml", id)) | ||
.arg("--source-type") | ||
.arg("catalog") | ||
.arg("--ts-package") | ||
.arg("--log.level=info") | ||
.arg("--log.format=color") | ||
.current_dir(tmp_dir), | ||
&db, | ||
"build", | ||
) | ||
.await | ||
.map_err(|err| Error::Task { | ||
task: "build".to_string(), | ||
err, | ||
})?; | ||
|
||
if !build_task.success() { | ||
return Ok(( | ||
State::BuildFailed { | ||
code: build_task.code(), | ||
}, | ||
db_path, | ||
)); | ||
} | ||
|
||
// Start a data-plane. It will use ${tmp_dir}/builds as its builds-root, | ||
// which we also used as the build directory, meaning the build database | ||
// is already in-place. | ||
let mut data_plane_task = tokio::process::Command::new("flowctl"); | ||
let data_plane_task = run_task( | ||
data_plane_task | ||
.arg("temp-data-plane") | ||
.arg("--network") | ||
.arg(&crate::config::settings().application.connector_network) | ||
.arg("--tempdir") | ||
.arg(tmp_dir) | ||
.arg("--unix-sockets") | ||
.arg("--log.level=info") | ||
.arg("--log.format=color") | ||
.current_dir(tmp_dir), | ||
&db, | ||
"temp-data-plane", | ||
) | ||
.fuse(); | ||
|
||
// Start the test runner. | ||
let mut test_task = tokio::process::Command::new("flowctl"); | ||
let test_task = run_task( | ||
test_task | ||
.arg("api") | ||
.arg("test") | ||
.arg("--build-id") | ||
.arg(&db_name) | ||
.arg("--broker.address") | ||
.arg(&format!( | ||
"unix://localhost/{}/gazette.sock", | ||
tmp_dir.as_os_str().to_string_lossy() | ||
)) | ||
.arg("--consumer.address") | ||
.arg(&format!( | ||
"unix://localhost/{}/consumer.sock", | ||
tmp_dir.as_os_str().to_string_lossy() | ||
)) | ||
.arg("--log.level=info") | ||
.arg("--log.format=color") | ||
.current_dir(tmp_dir), | ||
&db, | ||
"test", | ||
) | ||
.fuse(); | ||
|
||
// Drive the data-plane and test tasks, until tests complete. | ||
pin_mut!(data_plane_task, test_task); | ||
let test_task = select! { | ||
r = data_plane_task => { | ||
tracing::error!(?r, "test data-plane exited unexpectedly"); | ||
test_task.await // Wait for the test task to finish. | ||
} | ||
r = test_task => r, | ||
} | ||
.map_err(|err| Error::Task { | ||
task: "test".to_string(), | ||
err, | ||
})?; | ||
|
||
if !test_task.success() { | ||
return Ok(( | ||
State::TestFailed { | ||
code: test_task.code(), | ||
}, | ||
db_path, | ||
)); | ||
} | ||
|
||
Ok((State::Done, db_path)) | ||
} | ||
|
||
// run_task spawns the provided Command, capturing its stdout and stderr | ||
// into the provided logs database identified by |task|. | ||
async fn run_task( | ||
cmd: &mut tokio::process::Command, | ||
logs_db: &rusqlite::Connection, | ||
task: &str, | ||
) -> Result<std::process::ExitStatus, std::io::Error> { | ||
cmd | ||
// Pass through PATH, but remove all other environment variables. | ||
.env_clear() | ||
.envs(std::env::vars().filter(|&(ref k, _)| k == "PATH")) | ||
.kill_on_drop(true) | ||
.stdin(std::process::Stdio::null()) | ||
.stdout(std::process::Stdio::piped()) | ||
.stderr(std::process::Stdio::piped()); | ||
|
||
let mut child = cmd.spawn()?; | ||
|
||
let stdout = capture_task_logs(logs_db, task, 0, child.stdout.take().unwrap()); | ||
let stderr = capture_task_logs(logs_db, task, 1, child.stderr.take().unwrap()); | ||
|
||
let (_, _, exit) = futures::try_join!(stdout, stderr, child.wait())?; | ||
Ok(exit) | ||
} | ||
|
||
fn enable_wal_mode(db: &rusqlite::Connection) -> rusqlite::Result<()> { | ||
let mode: String = db.pragma_update_and_check(None, "journal_mode", "wal", |row| row.get(0))?; | ||
if mode != "wal" { | ||
Err(rusqlite::Error::UserFunctionError( | ||
format!("expected journal_mode to be wal, not {}", mode).into(), | ||
)) | ||
} else { | ||
Ok(()) | ||
} | ||
} | ||
|
||
fn create_task_logs_table(db: &rusqlite::Connection) -> rusqlite::Result<()> { | ||
db.execute_batch( | ||
r#" | ||
CREATE TABLE IF NOT EXISTS task_logs ( | ||
task TEXT NOT NULL, | ||
source INTEGER NOT NULL, -- 0 is stdout; 1 is stderr. | ||
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, | ||
line TEXT NOT NULL | ||
); | ||
"#, | ||
)?; | ||
|
||
Ok(()) | ||
} | ||
|
||
// capture_task_logs consumes lines from the AsyncRead and adds each as a log | ||
// entry to a well-known `task_logs` table within the given SQLite database. | ||
// Each entry is identified by its task and source. | ||
// By convention, stdout is source=0 and stderr is source=1. | ||
async fn capture_task_logs<R>( | ||
db: &rusqlite::Connection, | ||
task: &str, | ||
source: i32, | ||
r: R, | ||
) -> Result<(), std::io::Error> | ||
where | ||
R: tokio::io::AsyncRead + Unpin, | ||
{ | ||
let mut splits = tokio::io::BufReader::new(r).split(b'\n'); | ||
while let Some(split) = splits.next_segment().await? { | ||
db.execute( | ||
"INSERT INTO task_logs (task, source, line) VALUES (?, ?, ?);", | ||
rusqlite::params![task, source, split], | ||
) | ||
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; | ||
} | ||
|
||
Ok(()) | ||
} |
Oops, something went wrong.