Skip to content

Commit

Permalink
control: add builder service for building builds
Browse files Browse the repository at this point in the history
The builder daemon periodically polls the database for a build to run.
On finding one, it builds it in a local temporary directory using
`flowctl api build`, and then tests it using `flowctl temp-data-plane`
and `flowctl api test`.

This commit conceptualizes each of these activities as "tasks", where
log lines of each task are directly streamed into the build database as
they occur. The build DB can then be its own source-of-truth for all of
its outputs during the build process. We also preserve structured log
information from the build process.

Looking forward, activations and other background tasks will produce
similar types of logs, and we can use the same mechanism to capture /
persist / query them.

For now, the build daemon is directly integrated into the control plane
server. We'll probably want to separate these in the future.

Issue #400
  • Loading branch information
jgraettinger committed Mar 10, 2022
1 parent caa36da commit 90b2c7e
Show file tree
Hide file tree
Showing 10 changed files with 343 additions and 23 deletions.
4 changes: 1 addition & 3 deletions crates/build/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,8 @@ where

// Output database path is implied from the configured directory and ID.
let output_path = directory.join(&config.build_id);
// Create or truncate the output database.
std::fs::write(&output_path, &[]).context("failed to create catalog database")?;

let db = rusqlite::Connection::open(&output_path).context("failed to open catalog database")?;

tables::persist_tables(&db, &all_tables.as_tables())
.context("failed to persist catalog tables")?;
tracing::info!(?output_path, "wrote build database");
Expand Down
15 changes: 11 additions & 4 deletions crates/control/src/cmd/serve.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use futures::TryFutureExt;
use std::net::TcpListener;

use crate::cmd::{async_runtime, ConfigArgs};
use crate::config;
use crate::context::AppContext;
pub use crate::services::builds_root::init_builds_root;
use crate::startup;
use crate::{shutdown, startup};

#[derive(clap::Args, Debug)]
pub struct Args {
Expand All @@ -29,10 +30,16 @@ async fn serve(listener: TcpListener) -> anyhow::Result<()> {
let (put_builds, fetch_builds) = init_builds_root(&config::settings().builds_root)?;
let ctx = AppContext::new(db, put_builds, fetch_builds);

let server = startup::run(listener, ctx)?;
// TODO(johnny): For now, we run the API server and builder daemon together.
// We'll probably want to separate and independently deploy & scale these.
let server = startup::run(listener, ctx.clone())?.map_err(Into::into);

// The server runs until it receives a shutdown signal.
server.await?;
let builder_daemon =
crate::services::builder::serve_builds(ctx.clone(), shutdown::signal()).map_err(Into::into);

// Run until the builder_daemon and server both exit.
let out: Result<_, anyhow::Error> = futures::try_join!(server, builder_daemon);
out?;

Ok(())
}
2 changes: 1 addition & 1 deletion crates/control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ pub mod middleware;
pub mod models;
pub mod repo;
pub mod services;
pub mod shutdown;
pub mod startup;

mod controllers;
mod error;
mod routes;
mod shutdown;
12 changes: 8 additions & 4 deletions crates/control/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use clap::Parser;
use flow_cli_common::LogArgs;
use futures::TryFutureExt;
use std::net::TcpListener;

use control::config;
use control::config::app_env::{self, AppEnv};
use control::context::AppContext;
use control::services::builds_root::init_builds_root;
use control::startup;
use control::{shutdown, startup};

/// Runs the control plane api server in development mode.
#[derive(Debug, Parser)]
Expand Down Expand Up @@ -36,10 +37,13 @@ async fn main() -> anyhow::Result<()> {
let (put_builds, fetch_builds) = init_builds_root(&settings.builds_root)?;
let ctx = AppContext::new(db, put_builds, fetch_builds);

let server = startup::run(listener, ctx)?;
let server = startup::run(listener, ctx.clone())?.map_err(Into::into);
let builder_daemon = control::services::builder::serve_builds(ctx.clone(), shutdown::signal())
.map_err(Into::into);

// The server runs until it receives a shutdown signal.
server.await?;
// Run until the builder_daemon and server both exit.
let out: Result<_, anyhow::Error> = futures::try_join!(server, builder_daemon);
out?;

Ok(())
}
310 changes: 310 additions & 0 deletions crates/control/src/services/builder/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
use crate::context::AppContext;
use crate::models::{
builds::{Build, State},
id::Id,
};
use crate::repo::builds::{dequeue_build, update_build_state};

use futures::{pin_mut, select, FutureExt};
use std::{
io::Write,
path::{Path, PathBuf},
};
use tokio::io::AsyncBufReadExt;
use tracing::{debug, error, info};

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("failed to create temporary build directory")]
CreateDir(#[source] std::io::Error),
#[error("failed to create source catalog file")]
CreateSource(#[source] std::io::Error),
#[error("processing task {task:?} failed")]
Task {
task: String,
#[source]
err: std::io::Error,
},
#[error("failed to persist build {id}")]
PersistBuild {
id: Id<Build>,
#[source]
src: crate::services::builds_root::BuildsRootError,
},
#[error("sqlite database error")]
Sqlite(#[from] rusqlite::Error),
#[error("control plane database error")]
SqlxError(#[from] sqlx::Error),
}

pub async fn serve_builds<F>(ctx: AppContext, shutdown: F) -> Result<(), Error>
where
F: std::future::Future<Output = ()>,
{
let shutdown = shutdown.fuse();
pin_mut!(shutdown);

let mut backoff = std::time::Duration::ZERO;

loop {
let sleep = tokio::time::sleep(backoff).fuse();
pin_mut!(sleep);

select! {
_ = shutdown => {
return Ok(())
}
_ = &mut sleep => (),
};

// Begin a |txn| which will scope a held advisory lock on `Build.id`.
let mut txn = ctx.db().begin().await?;

if let Some(build) = dequeue_build(&mut txn).await? {
let Build {
id,
account_id,
catalog,
created_at,
state: _,
updated_at,
} = build;

let tmpdir = tempfile::TempDir::new().map_err(|e| Error::CreateDir(e))?;
info!(%id, %account_id, tmpdir=?tmpdir.path(), %created_at, %updated_at, "processing dequeued build");

let (state, db_path) = process_build(id, catalog.unwrap().0, tmpdir.path()).await?;
info!(?state, "processed build");

ctx.put_builds()
.put_build(id, &db_path)
.await
.map_err(|src| Error::PersistBuild { id, src })?;
info!(%id, "put build");

update_build_state(&mut txn, id, state).await?;
txn.commit().await?;

backoff = std::time::Duration::ZERO;
} else {
debug!("serve_builds found no build to dequeue. Sleeping...");
backoff = std::time::Duration::from_secs(5);
}
}
}

async fn process_build(
id: Id<Build>,
catalog: models::Catalog,
tmp_dir: &Path,
) -> Result<(State, PathBuf), Error> {
// We perform the build under a ./builds/ subdirectory, which is a
// specific sub-path expected by temp-data-plane underneath its
// working temporary directory. This lets temp-data-plane use the
// build database in-place.
let builds_dir = tmp_dir.join("builds");
std::fs::create_dir(&builds_dir).map_err(|err| Error::CreateDir(err))?;

// Write our catalog source file within the build directory.
std::fs::File::create(&builds_dir.join(&format!("{}.flow.yaml", id)))
.and_then(|mut f| {
f.write_all(
serde_json::to_string_pretty(&catalog)
.expect("to always serialize a models::Catalog")
.as_bytes(),
)
})
.map_err(|e| Error::CreateSource(e))?;

let db_name = format!("{}", id);
let db_path = builds_dir.join(&db_name);
let db = rusqlite::Connection::open(&db_path)?;

enable_wal_mode(&db)?;
create_task_logs_table(&db)?;

let build_task = run_task(
tokio::process::Command::new("flowctl")
.arg("api")
.arg("build")
.arg("--build-id")
.arg(&db_name)
.arg("--directory")
.arg(&builds_dir)
.arg("--fs-root")
.arg(&builds_dir)
.arg("--network")
.arg(&crate::config::settings().application.connector_network)
.arg("--source")
.arg(format!("file:///{}.flow.yaml", id))
.arg("--source-type")
.arg("catalog")
.arg("--ts-package")
.arg("--log.level=info")
.arg("--log.format=color")
.current_dir(tmp_dir),
&db,
"build",
)
.await
.map_err(|err| Error::Task {
task: "build".to_string(),
err,
})?;

if !build_task.success() {
return Ok((
State::BuildFailed {
code: build_task.code(),
},
db_path,
));
}

// Start a data-plane. It will use ${tmp_dir}/builds as its builds-root,
// which we also used as the build directory, meaning the build database
// is already in-place.
let mut data_plane_task = tokio::process::Command::new("flowctl");
let data_plane_task = run_task(
data_plane_task
.arg("temp-data-plane")
.arg("--network")
.arg(&crate::config::settings().application.connector_network)
.arg("--tempdir")
.arg(tmp_dir)
.arg("--unix-sockets")
.arg("--log.level=info")
.arg("--log.format=color")
.current_dir(tmp_dir),
&db,
"temp-data-plane",
)
.fuse();

// Start the test runner.
let mut test_task = tokio::process::Command::new("flowctl");
let test_task = run_task(
test_task
.arg("api")
.arg("test")
.arg("--build-id")
.arg(&db_name)
.arg("--broker.address")
.arg(&format!(
"unix://localhost/{}/gazette.sock",
tmp_dir.as_os_str().to_string_lossy()
))
.arg("--consumer.address")
.arg(&format!(
"unix://localhost/{}/consumer.sock",
tmp_dir.as_os_str().to_string_lossy()
))
.arg("--log.level=info")
.arg("--log.format=color")
.current_dir(tmp_dir),
&db,
"test",
)
.fuse();

// Drive the data-plane and test tasks, until tests complete.
pin_mut!(data_plane_task, test_task);
let test_task = select! {
r = data_plane_task => {
tracing::error!(?r, "test data-plane exited unexpectedly");
test_task.await // Wait for the test task to finish.
}
r = test_task => r,
}
.map_err(|err| Error::Task {
task: "test".to_string(),
err,
})?;

if !test_task.success() {
return Ok((
State::TestFailed {
code: test_task.code(),
},
db_path,
));
}

Ok((State::Done, db_path))
}

// run_task spawns the provided Command, capturing its stdout and stderr
// into the provided logs database identified by |task|.
async fn run_task(
cmd: &mut tokio::process::Command,
logs_db: &rusqlite::Connection,
task: &str,
) -> Result<std::process::ExitStatus, std::io::Error> {
cmd
// Pass through PATH, but remove all other environment variables.
.env_clear()
.envs(std::env::vars().filter(|&(ref k, _)| k == "PATH"))
.kill_on_drop(true)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());

let mut child = cmd.spawn()?;

let stdout = capture_task_logs(logs_db, task, 0, child.stdout.take().unwrap());
let stderr = capture_task_logs(logs_db, task, 1, child.stderr.take().unwrap());

let (_, _, exit) = futures::try_join!(stdout, stderr, child.wait())?;
Ok(exit)
}

fn enable_wal_mode(db: &rusqlite::Connection) -> rusqlite::Result<()> {
let mode: String = db.pragma_update_and_check(None, "journal_mode", "wal", |row| row.get(0))?;
if mode != "wal" {
Err(rusqlite::Error::UserFunctionError(
format!("expected journal_mode to be wal, not {}", mode).into(),
))
} else {
Ok(())
}
}

fn create_task_logs_table(db: &rusqlite::Connection) -> rusqlite::Result<()> {
db.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS task_logs (
task TEXT NOT NULL,
source INTEGER NOT NULL, -- 0 is stdout; 1 is stderr.
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
line TEXT NOT NULL
);
"#,
)?;

Ok(())
}

// capture_task_logs consumes lines from the AsyncRead and adds each as a log
// entry to a well-known `task_logs` table within the given SQLite database.
// Each entry is identified by its task and source.
// By convention, stdout is source=0 and stderr is source=1.
async fn capture_task_logs<R>(
db: &rusqlite::Connection,
task: &str,
source: i32,
r: R,
) -> Result<(), std::io::Error>
where
R: tokio::io::AsyncRead + Unpin,
{
let mut splits = tokio::io::BufReader::new(r).split(b'\n');
while let Some(split) = splits.next_segment().await? {
db.execute(
"INSERT INTO task_logs (task, source, line) VALUES (?, ?, ?);",
rusqlite::params![task, source, split],
)
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
}

Ok(())
}

0 comments on commit 90b2c7e

Please sign in to comment.