From 5d7c4bbe84873bec2b0b4620a49afef026a33eb3 Mon Sep 17 00:00:00 2001 From: Brandon Pitman Date: Thu, 14 Jul 2022 15:17:05 -0700 Subject: [PATCH 1/7] Rename provision_task to janus_cli, split operations out to subcommands. --- janus_server/Cargo.toml | 2 +- .../src/bin/aggregation_job_creator.rs | 2 +- .../src/bin/aggregation_job_driver.rs | 2 +- janus_server/src/bin/aggregator.rs | 2 +- janus_server/src/bin/collect_job_driver.rs | 2 +- janus_server/src/bin/janus_cli.rs | 215 ++++++++++++++++++ janus_server/src/bin/provision_task.rs | 212 ----------------- janus_server/src/binary_utils.rs | 12 +- janus_server/src/datastore.rs | 9 +- 9 files changed, 229 insertions(+), 229 deletions(-) create mode 100644 janus_server/src/bin/janus_cli.rs delete mode 100644 janus_server/src/bin/provision_task.rs diff --git a/janus_server/Cargo.toml b/janus_server/Cargo.toml index 91eede9db..e09ffbfe6 100644 --- a/janus_server/Cargo.toml +++ b/janus_server/Cargo.toml @@ -15,6 +15,7 @@ test-util = ["janus_core/test-util", "dep:lazy_static", "dep:testcontainers"] [dependencies] anyhow = "1" +assert_matches = "1" atty = "0.2" base64 = "0.13.0" bytes = "1.1.0" @@ -62,7 +63,6 @@ uuid = { version = "1.1.2", features = ["v4"] } warp = { version = "^0.3", features = ["tls"] } [dev-dependencies] -assert_matches = "1" hex = { version = "0.4.3", features = ["serde"] } hyper = "0.14.19" janus_server = { path = ".", features = ["test-util"] } diff --git a/janus_server/src/bin/aggregation_job_creator.rs b/janus_server/src/bin/aggregation_job_creator.rs index ae211c547..bd3c32290 100644 --- a/janus_server/src/bin/aggregation_job_creator.rs +++ b/janus_server/src/bin/aggregation_job_creator.rs @@ -9,7 +9,7 @@ use structopt::StructOpt; #[tokio::main] async fn main() -> anyhow::Result<()> { - janus_main::(RealClock::default(), |ctx| async move { + janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move { // Start creating aggregation jobs. Arc::new(AggregationJobCreator::new( ctx.datastore, diff --git a/janus_server/src/bin/aggregation_job_driver.rs b/janus_server/src/bin/aggregation_job_driver.rs index 8ec24ecc4..580948017 100644 --- a/janus_server/src/bin/aggregation_job_driver.rs +++ b/janus_server/src/bin/aggregation_job_driver.rs @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> { "/aggregation_job_driver", ); - janus_main::(RealClock::default(), |ctx| async move { + janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move { let meter = opentelemetry::global::meter("aggregation_job_driver"); let datastore = Arc::new(ctx.datastore); let aggregation_job_driver = Arc::new(AggregationJobDriver::new( diff --git a/janus_server/src/bin/aggregator.rs b/janus_server/src/bin/aggregator.rs index 96d36394d..f686433f2 100644 --- a/janus_server/src/bin/aggregator.rs +++ b/janus_server/src/bin/aggregator.rs @@ -13,7 +13,7 @@ use tracing::info; #[tokio::main] async fn main() -> Result<()> { - janus_main::(RealClock::default(), |ctx| async move { + janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move { let shutdown_signal = setup_signal_handler().context("failed to register SIGTERM signal handler")?; diff --git a/janus_server/src/bin/collect_job_driver.rs b/janus_server/src/bin/collect_job_driver.rs index 14ad821ff..257b344d5 100644 --- a/janus_server/src/bin/collect_job_driver.rs +++ b/janus_server/src/bin/collect_job_driver.rs @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> { "/collect_job_driver" ); - janus_main::(RealClock::default(), |ctx| async move { + janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move { let meter = opentelemetry::global::meter("collect_job_driver"); let datastore = Arc::new(ctx.datastore); let collect_job_driver = Arc::new(CollectJobDriver::new( diff --git a/janus_server/src/bin/janus_cli.rs b/janus_server/src/bin/janus_cli.rs new file mode 100644 index 000000000..9570b61e2 --- /dev/null +++ b/janus_server/src/bin/janus_cli.rs @@ -0,0 +1,215 @@ +use anyhow::{Context, Result}; +use deadpool_postgres::Pool; +use janus_core::time::{Clock, RealClock}; +use janus_server::{ + binary_utils::{janus_main, BinaryContext, BinaryOptions, CommonBinaryOptions}, + config::{BinaryConfig, CommonConfig}, + datastore::{self, Datastore}, + task::Task, +}; +use serde::{Deserialize, Serialize}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; +use structopt::StructOpt; +use tokio::fs; +use tracing::info; + +static SCHEMA: &str = include_str!("../../../db/schema.sql"); + +#[tokio::main] +async fn main() -> Result<()> { + janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move { + ctx.options.cmd.execute(&ctx).await + }) + .await +} + +async fn write_schema(pool: &Pool) -> Result<()> { + info!("Writing database schema"); + let db_client = pool.get().await.context("couldn't get database client")?; + db_client + .batch_execute(SCHEMA) + .await + .context("couldn't write database schema")?; + Ok(()) +} + +async fn provision_tasks(datastore: &Datastore, tasks_file: &Path) -> Result<()> { + // Read tasks file. + info!("Reading tasks file"); + let tasks: Vec = { + let task_file_contents = fs::read_to_string(tasks_file) + .await + .with_context(|| format!("couldn't read tasks file {:?}", tasks_file))?; + serde_yaml::from_str(&task_file_contents) + .with_context(|| format!("couldn't parse tasks file {:?}", tasks_file))? + }; + + // Write all tasks requested. + let tasks = Arc::new(tasks); + info!(task_count = tasks.len(), "Writing tasks"); + datastore + .run_tx(|tx| { + let tasks = Arc::clone(&tasks); + Box::pin(async move { + for task in tasks.iter() { + // We attempt to delete the task, but ignore "task not found" errors since + // the task not existing is an OK outcome too. + match tx.delete_task(task.id).await { + Ok(_) | Err(datastore::Error::MutationTargetNotFound) => (), + err => err?, + } + + tx.put_task(task).await?; + } + Ok(()) + }) + }) + .await + .context("couldn't write tasks") +} + +#[derive(Debug, StructOpt)] +#[structopt( + name = "janus-provision-task", + about = "Janus `provision task` command", + rename_all = "kebab-case", + version = env!("CARGO_PKG_VERSION"), +)] +struct Options { + #[structopt(flatten)] + common: CommonBinaryOptions, + + #[structopt(subcommand)] + cmd: Command, +} + +impl BinaryOptions for Options { + fn common_options(&self) -> &CommonBinaryOptions { + &self.common + } +} + +#[derive(Debug, StructOpt)] +enum Command { + WriteSchema, + ProvisionTasks { + /// A YAML file containing a list of tasks to be written. Existing tasks (matching by task + /// ID) will be overwritten. + tasks_file: PathBuf, + }, +} + +impl Command { + async fn execute(&self, ctx: &BinaryContext) -> Result<()> { + match self { + Command::WriteSchema => write_schema(&ctx.pool).await, + Command::ProvisionTasks { tasks_file } => { + provision_tasks(&ctx.datastore, tasks_file).await + } + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +struct Config { + #[serde(flatten)] + common_config: CommonConfig, +} + +impl BinaryConfig for Config { + fn common_config(&mut self) -> &mut CommonConfig { + &mut self.common_config + } +} + +#[cfg(test)] +mod tests { + use super::Config; + use janus_core::{ + message::{Role, TaskId}, + task::VdafInstance, + time::RealClock, + }; + use janus_server::{ + config::test_util::{ + generate_db_config, generate_metrics_config, generate_trace_config, roundtrip_encoding, + }, + config::CommonConfig, + datastore::test_util::{ephemeral_datastore, ephemeral_db_handle}, + task::test_util::new_dummy_task, + }; + use std::{collections::HashMap, io::Write}; + use tempfile::NamedTempFile; + + #[tokio::test] + async fn write_schema() { + let db_handle = ephemeral_db_handle(); + let ds = db_handle.datastore(RealClock::default()); + + // Verify that the query we will run later returns an error if there is no database schema written. + ds.run_tx(|tx| Box::pin(async move { tx.get_tasks().await })) + .await + .unwrap_err(); + + // Run the program logic. + super::write_schema(&db_handle.pool()).await.unwrap(); + + // Verify that the schema was written (by running a query that would fail if it weren't). + ds.run_tx(|tx| Box::pin(async move { tx.get_tasks().await })) + .await + .unwrap(); + } + + #[tokio::test] + async fn provision_tasks() { + let tasks = Vec::from([ + new_dummy_task( + TaskId::random(), + VdafInstance::Prio3Aes128Count.into(), + Role::Leader, + ), + new_dummy_task( + TaskId::random(), + VdafInstance::Prio3Aes128Sum { bits: 64 }.into(), + Role::Helper, + ), + ]); + + let (ds, _db_handle) = ephemeral_datastore(RealClock::default()).await; + + // Write tasks to a temporary file. + let mut tasks_file = NamedTempFile::new().unwrap(); + tasks_file + .write_all(serde_yaml::to_string(&tasks).unwrap().as_ref()) + .unwrap(); + let tasks_path = tasks_file.into_temp_path(); + + // Run the program logic. + super::provision_tasks(&ds, &tasks_path).await.unwrap(); + + // Verify that the expected tasks were written. + let want_tasks: HashMap<_, _> = tasks.into_iter().map(|task| (task.id, task)).collect(); + let got_tasks = ds + .run_tx(|tx| Box::pin(async move { tx.get_tasks().await })) + .await + .unwrap() + .into_iter() + .map(|task| (task.id, task)) + .collect(); + assert_eq!(want_tasks, got_tasks); + } + + #[test] + fn roundtrip_config() { + roundtrip_encoding(Config { + common_config: CommonConfig { + database: generate_db_config(), + logging_config: generate_trace_config(), + metrics_config: generate_metrics_config(), + }, + }) + } +} diff --git a/janus_server/src/bin/provision_task.rs b/janus_server/src/bin/provision_task.rs deleted file mode 100644 index 26f779b31..000000000 --- a/janus_server/src/bin/provision_task.rs +++ /dev/null @@ -1,212 +0,0 @@ -use std::sync::Arc; - -use anyhow::Result; -use janus_core::time::{Clock, RealClock}; -use janus_server::{ - binary_utils::{janus_main, BinaryContext, BinaryOptions, CommonBinaryOptions}, - config::{BinaryConfig, CommonConfig}, - datastore, - task::Task, -}; -use serde::{Deserialize, Serialize}; -use structopt::StructOpt; -use tracing::info; - -static SCHEMA: &str = include_str!("../../../db/schema.sql"); - -#[tokio::main] -async fn main() -> Result<()> { - janus_main::(RealClock::default(), run).await -} - -async fn run(ctx: BinaryContext) -> Result<()> { - // Try to write the DB schema, if requested. - if ctx.config.write_schema { - info!("Writing database schema"); - let db_client = ctx.pool.get().await?; - db_client.batch_execute(SCHEMA).await?; - } - - // Write all tasks requested. - if !ctx.config.tasks.is_empty() { - let tasks = Arc::new(ctx.config.tasks); - info!(task_count = tasks.len(), "Writing tasks"); - ctx.datastore - .run_tx(|tx| { - let tasks = Arc::clone(&tasks); - Box::pin(async move { - for task in tasks.iter() { - // We attempt to delete the task, but ignore "task not found" errors since - // the task not existing is an OK outcome too. - match tx.delete_task(task.id).await { - Ok(_) | Err(datastore::Error::MutationTargetNotFound) => (), - err => err?, - } - - tx.put_task(task).await?; - } - Ok(()) - }) - }) - .await?; - } - - Ok(()) -} - -#[derive(Debug, StructOpt)] -#[structopt( - name = "janus-provision-task", - about = "Janus `provision task` command", - rename_all = "kebab-case", - version = env!("CARGO_PKG_VERSION"), -)] -struct Options { - #[structopt(flatten)] - common: CommonBinaryOptions, -} - -impl BinaryOptions for Options { - fn common_options(&self) -> &CommonBinaryOptions { - &self.common - } -} - -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -struct Config { - #[serde(flatten)] - common_config: CommonConfig, - - /// If set to true, always attempt to write the schema. If set to false, never attempt to write - /// the schema. - #[serde(default)] - write_schema: bool, - - /// A list of tasks to be written. Existing tasks (matching by task ID) will be overwritten. - tasks: Vec, -} - -impl BinaryConfig for Config { - fn common_config(&mut self) -> &mut CommonConfig { - &mut self.common_config - } -} - -#[cfg(test)] -mod tests { - use super::{run, Config}; - use janus_core::{ - message::{Role, TaskId}, - task::VdafInstance, - time::RealClock, - }; - use janus_server::{ - binary_utils::BinaryContext, - config::CommonConfig, - config::{ - test_util::{ - generate_db_config, generate_metrics_config, generate_trace_config, - roundtrip_encoding, - }, - DbConfig, - }, - datastore::test_util::ephemeral_db_handle, - metrics::MetricsConfiguration, - task::test_util::new_dummy_task, - trace::TraceConfiguration, - }; - use reqwest::Url; - use std::collections::HashMap; - - #[tokio::test] - async fn provision_task() { - for (write_schema, tasks) in [ - ( - false, - Vec::from([ - new_dummy_task( - TaskId::random(), - VdafInstance::Prio3Aes128Count.into(), - Role::Leader, - ), - new_dummy_task( - TaskId::random(), - VdafInstance::Prio3Aes128Sum { bits: 64 }.into(), - Role::Helper, - ), - ]), - ), - ( - true, - Vec::from([ - new_dummy_task( - TaskId::random(), - VdafInstance::Prio3Aes128Count.into(), - Role::Leader, - ), - new_dummy_task( - TaskId::random(), - VdafInstance::Prio3Aes128Sum { bits: 64 }.into(), - Role::Helper, - ), - ]), - ), - ] { - let db_handle = ephemeral_db_handle(); - if !write_schema { - // If we aren't going to ask the program logic to write the schema, we write it - // ourselves to simulate it being already written. - db_handle.write_schema().await; - } - - // Run the program logic with the specified parameters. - run(BinaryContext { - clock: RealClock::default(), - config: Config { - common_config: CommonConfig { - database: DbConfig { - url: Url::parse("http://db_endpoint").unwrap(), - }, - logging_config: TraceConfiguration::default(), - metrics_config: MetricsConfiguration::default(), - }, - write_schema, - tasks: tasks.clone(), - }, - datastore: db_handle.datastore(RealClock::default()), - pool: db_handle.pool(), - }) - .await - .unwrap(); - - // Check that the expected tasks were written. - let ds = db_handle.datastore(RealClock::default()); - let want_tasks: HashMap<_, _> = tasks.into_iter().map(|task| (task.id, task)).collect(); - let got_tasks = ds - .run_tx(|tx| Box::pin(async move { tx.get_tasks().await })) - .await - .unwrap() - .into_iter() - .map(|task| (task.id, task)) - .collect(); - assert_eq!(want_tasks, got_tasks); - } - } - - #[test] - fn roundtrip_config() { - roundtrip_encoding(Config { - common_config: CommonConfig { - database: generate_db_config(), - logging_config: generate_trace_config(), - metrics_config: generate_metrics_config(), - }, - write_schema: true, - tasks: Vec::from([new_dummy_task( - TaskId::random(), - VdafInstance::Prio3Aes128Count.into(), - Role::Leader, - )]), - }) - } -} diff --git a/janus_server/src/binary_utils.rs b/janus_server/src/binary_utils.rs index baa4a572c..4248905d3 100644 --- a/janus_server/src/binary_utils.rs +++ b/janus_server/src/binary_utils.rs @@ -161,23 +161,24 @@ fn parse_metadata_entry(input: &str) -> Result<(String, String)> { } /// BinaryContext provides contextual objects related to a Janus binary. -pub struct BinaryContext { +pub struct BinaryContext { pub clock: C, + pub options: Options, pub config: Config, pub datastore: Datastore, pub pool: Pool, } -pub async fn janus_main(clock: C, f: F) -> anyhow::Result<()> +pub async fn janus_main(clock: C, f: F) -> anyhow::Result<()> where - O: BinaryOptions, C: Clock, + Options: BinaryOptions, Config: BinaryConfig, - F: FnOnce(BinaryContext) -> Fut, + F: FnOnce(BinaryContext) -> Fut, Fut: Future>, { // Read arguments, read & parse config. - let options = O::from_args(); + let options = Options::from_args(); let common_options = options.common_options(); let mut config = { let config_content = @@ -228,6 +229,7 @@ where f(BinaryContext { clock, + options, config, datastore, pool, diff --git a/janus_server/src/datastore.rs b/janus_server/src/datastore.rs index d06791873..df4717242 100644 --- a/janus_server/src/datastore.rs +++ b/janus_server/src/datastore.rs @@ -3007,12 +3007,6 @@ pub mod test_util { Pool::builder(conn_mgr).build().unwrap() } - /// Write the Janus schema into the datastore. - pub async fn write_schema(&self) { - let client = self.pool().get().await.unwrap(); - client.batch_execute(SCHEMA).await.unwrap(); - } - /// Get a PostgreSQL connection string to connect to the temporary database. pub fn connection_string(&self) -> &str { &self.connection_string @@ -3142,7 +3136,8 @@ pub mod test_util { /// Dropping the second return value causes the database to be shut down & cleaned up. pub async fn ephemeral_datastore(clock: C) -> (Datastore, DbHandle) { let db_handle = ephemeral_db_handle(); - db_handle.write_schema().await; + let client = db_handle.pool().get().await.unwrap(); + client.batch_execute(SCHEMA).await.unwrap(); (db_handle.datastore(clock), db_handle) } From b37b7e5251f1b05e9a5001adeaf38fe51e5c158b Mon Sep 17 00:00:00 2001 From: Brandon Pitman Date: Thu, 14 Jul 2022 17:11:58 -0700 Subject: [PATCH 2/7] janus_cli: add create_datastore_key command. The testing is done via a new library used to spin up an ephemeral Kubernetes cluster using `kind`. It interacts with `kind` by shelling out; I'm not happy about this, but I can't find an equivalent library for Rust. --- Cargo.lock | 375 ++++++++++++++++++++++++- janus_core/Cargo.toml | 4 +- janus_core/src/test_util/kubernetes.rs | 60 ++++ janus_core/src/test_util/mod.rs | 1 + janus_server/Cargo.toml | 4 +- janus_server/src/bin/janus_cli.rs | 96 ++++++- 6 files changed, 529 insertions(+), 11 deletions(-) create mode 100644 janus_core/src/test_util/kubernetes.rs diff --git a/Cargo.lock b/Cargo.lock index 05a8fa517..2b4631ba9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -458,6 +458,22 @@ dependencies = [ "memchr", ] +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" + [[package]] name = "cpufeatures" version = "0.2.2" @@ -702,6 +718,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs-next" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" +dependencies = [ + "cfg-if 1.0.0", + "dirs-sys-next", +] + +[[package]] +name = "dirs-sys-next" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "dunce" version = "1.0.2" @@ -802,6 +839,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.0.1" @@ -1202,6 +1254,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -1304,6 +1369,7 @@ dependencies = [ "futures", "hex", "hpke-dispatch", + "kube", "num_enum", "postgres-protocol", "postgres-types", @@ -1340,6 +1406,8 @@ dependencies = [ "itertools", "janus_core", "janus_server", + "k8s-openapi", + "kube", "lazy_static", "libc", "mockito", @@ -1388,6 +1456,96 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonpath_lib" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaa63191d68230cccb81c5aa23abd53ed64d83337cacbb25a7b8c7979523774f" +dependencies = [ + "log", + "serde", + "serde_json", +] + +[[package]] +name = "k8s-openapi" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f8de9873b904e74b3533f77493731ee26742418077503683db44e1b3c54aa5c" +dependencies = [ + "base64", + "bytes", + "chrono", + "http", + "percent-encoding", + "serde", + "serde-value", + "serde_json", + "url", +] + +[[package]] +name = "kube" +version = "0.65.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ec231e9ec9e84789f9eb414d1ac40ce6c90d0517fb272a335b4233f2e272b1e" +dependencies = [ + "k8s-openapi", + "kube-client", + "kube-core", +] + +[[package]] +name = "kube-client" +version = "0.65.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95dddb1fcced906d79cdae530ff39079c2d3772b2d623088fdbebe610bfa8217" +dependencies = [ + "base64", + "bytes", + "chrono", + "dirs-next", + "either", + "futures", + "http", + "http-body", + "hyper", + "hyper-timeout", + "hyper-tls", + "jsonpath_lib", + "k8s-openapi", + "kube-core", + "openssl", + "pem", + "pin-project", + "serde", + "serde_json", + "serde_yaml", + "thiserror", + "tokio", + "tokio-native-tls", + "tokio-util 0.6.9", + "tower", + "tower-http", + "tracing", +] + +[[package]] +name = "kube-core" +version = "0.65.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c52b6ab05d160691083430f6f431707a4e05b64903f2ffa0095ee5efde759117" +dependencies = [ + "chrono", + "form_urlencoded", + "http", + "k8s-openapi", + "once_cell", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1596,6 +1754,24 @@ dependencies = [ "twoway", ] +[[package]] +name = "native-tls" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd7e2f3618557f980e0b17e8856252eee3c97fa12c54dff0ca290fb6266ca4a9" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nom" version = "7.1.1" @@ -1683,6 +1859,51 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl" +version = "0.10.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "618febf65336490dfcf20b73f885f5651a0c89c64c2d4a8c3662585a70bf5bd0" +dependencies = [ + "bitflags", + "cfg-if 1.0.0", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + +[[package]] +name = "openssl-sys" +version = "0.9.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5f9bd0c2710541a3cda73d6f9ac4f1b240de4ae261065d309dbe73d9dceb42f" +dependencies = [ + "autocfg", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.17.0" @@ -1768,6 +1989,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" +dependencies = [ + "num-traits", +] + [[package]] name = "os_pipe" version = "1.0.1" @@ -1808,7 +2038,16 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-sys", + "windows-sys 0.34.0", +] + +[[package]] +name = "pem" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c64931a1a212348ec4f3b4362585eca7159d0d09cbdf4a7f74f02173596fd4" +dependencies = [ + "base64", ] [[package]] @@ -1877,6 +2116,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" + [[package]] name = "poly1305" version = "0.7.2" @@ -2207,6 +2452,17 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_users" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" +dependencies = [ + "getrandom", + "redox_syscall", + "thiserror", +] + [[package]] name = "regex" version = "1.5.5" @@ -2363,6 +2619,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2" +dependencies = [ + "lazy_static", + "windows-sys 0.36.1", +] + [[package]] name = "scoped-tls" version = "1.0.0" @@ -2407,6 +2673,29 @@ dependencies = [ "zeroize", ] +[[package]] +name = "security-framework" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dc14f172faf8a0194a3aded622712b0de276821addc574fa54fc0a1167e10dc" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.138" @@ -2416,6 +2705,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float 2.10.0", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.138" @@ -2433,6 +2732,7 @@ version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "82c2c1fdcd807d1098552c5b9a36e425e42e9fbd7c6a37a8425f390f781f7fa7" dependencies = [ + "indexmap", "itoa", "ryu", "serde", @@ -2815,7 +3115,7 @@ dependencies = [ "byteorder", "integer-encoding", "log", - "ordered-float", + "ordered-float 1.1.1", "threadpool", ] @@ -2886,6 +3186,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-postgres" version = "0.7.6" @@ -3107,6 +3417,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aba3f3efabf7fb41fae8534fc20a817013dd1c12cb45441efb6c82e6556b4cd8" dependencies = [ + "base64", "bitflags", "bytes", "futures-core", @@ -3118,6 +3429,7 @@ dependencies = [ "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -3380,6 +3692,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vec_map" version = "0.8.2" @@ -3639,11 +3957,24 @@ version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5acdd78cb4ba54c0045ac14f62d8f94a03d10047904ae2a40afa1e99d8f70825" dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", + "windows_aarch64_msvc 0.34.0", + "windows_i686_gnu 0.34.0", + "windows_i686_msvc 0.34.0", + "windows_x86_64_gnu 0.34.0", + "windows_x86_64_msvc 0.34.0", +] + +[[package]] +name = "windows-sys" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +dependencies = [ + "windows_aarch64_msvc 0.36.1", + "windows_i686_gnu 0.36.1", + "windows_i686_msvc 0.36.1", + "windows_x86_64_gnu 0.36.1", + "windows_x86_64_msvc 0.36.1", ] [[package]] @@ -3652,30 +3983,60 @@ version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17cffbe740121affb56fad0fc0e421804adf0ae00891205213b5cecd30db881d" +[[package]] +name = "windows_aarch64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" + [[package]] name = "windows_i686_gnu" version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2564fde759adb79129d9b4f54be42b32c89970c18ebf93124ca8870a498688ed" +[[package]] +name = "windows_i686_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" + [[package]] name = "windows_i686_msvc" version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cd9d32ba70453522332c14d38814bceeb747d80b3958676007acadd7e166956" +[[package]] +name = "windows_i686_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" + [[package]] name = "windows_x86_64_gnu" version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfce6deae227ee8d356d19effc141a509cc503dfd1f850622ec4b0f84428e1f4" +[[package]] +name = "windows_x86_64_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" + [[package]] name = "windows_x86_64_msvc" version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d19538ccc21819d01deaf88d6a17eae6596a12e9aafdbb97916fb49896d89de9" +[[package]] +name = "windows_x86_64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" + [[package]] name = "winreg" version = "0.10.1" diff --git a/janus_core/Cargo.toml b/janus_core/Cargo.toml index f03439cb2..70b428b89 100644 --- a/janus_core/Cargo.toml +++ b/janus_core/Cargo.toml @@ -13,8 +13,9 @@ rust-version = "1.60" database = ["dep:bytes", "dep:postgres-protocol", "dep:postgres-types"] test-util = [ "dep:assert_matches", - "dep:serde_json", "dep:futures", + "dep:kube", + "dep:serde_json", "dep:tracing", "dep:tracing-log", "dep:tracing-subscriber", @@ -29,6 +30,7 @@ bytes = { version = "1.1.0", optional = true } chrono = "0.4" hex = "0.4.3" hpke-dispatch = "0.3.0" +kube = { version = "0.65.0", optional = true } num_enum = "0.5.6" postgres-protocol = { version = "0.6.4", optional = true } postgres-types = { version = "0.2.3", optional = true } diff --git a/janus_core/src/test_util/kubernetes.rs b/janus_core/src/test_util/kubernetes.rs new file mode 100644 index 000000000..f6d896b3e --- /dev/null +++ b/janus_core/src/test_util/kubernetes.rs @@ -0,0 +1,60 @@ +//! Testing framework for functionality that interacts with Kubernetes. + +use kube::config::KubeConfigOptions; +use rand::{thread_rng, Rng}; +use std::process::{Command, Stdio}; + +/// EphemeralCluster represents a running ephemeral Kubernetes cluster for testing. Dropping an +/// EphemeralCluster will cause the associated Kubernetes cluster to be stopped & cleaned up. +pub struct EphemeralCluster { + name: String, +} + +impl EphemeralCluster { + /// Creates & starts a new ephemeral Kubernetes cluster. + pub fn create() -> Self { + // Choose a cluster name. + let mut randomness = [0u8; 4]; + thread_rng().fill(&mut randomness); + let cluster_name = format!("janus-ephemeral-{}", hex::encode(&randomness)); + + // Use kind to start the cluster. + assert!(Command::new("kind") + .args(["create", "cluster", "--name", &cluster_name]) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .unwrap() + .success()); + + Self { name: cluster_name } + } + + /// Returns a new [`kube::Client`] configured to interact with this Kubernetes cluster. + pub async fn client(&self) -> kube::Client { + kube::Client::try_from( + kube::Config::from_kubeconfig(&KubeConfigOptions { + context: Some(format!("kind-{}", self.name)), + ..KubeConfigOptions::default() + }) + .await + .unwrap(), + ) + .unwrap() + } +} + +impl Drop for EphemeralCluster { + fn drop(&mut self) { + // Delete the cluster that was created when we created the EphemeralCluster. + assert!(Command::new("kind") + .args(["delete", "cluster", "--name", &self.name]) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .unwrap() + .success()) + } +} diff --git a/janus_core/src/test_util/mod.rs b/janus_core/src/test_util/mod.rs index 4131eeefe..6b9bbbc66 100644 --- a/janus_core/src/test_util/mod.rs +++ b/janus_core/src/test_util/mod.rs @@ -9,6 +9,7 @@ use tracing_log::LogTracer; use tracing_subscriber::{prelude::*, EnvFilter, Registry}; pub mod dummy_vdaf; +pub mod kubernetes; pub mod runtime; /// A transcript of a VDAF run. All fields are indexed by natural role index (i.e., index 0 = diff --git a/janus_server/Cargo.toml b/janus_server/Cargo.toml index e09ffbfe6..3c5adc559 100644 --- a/janus_server/Cargo.toml +++ b/janus_server/Cargo.toml @@ -15,7 +15,6 @@ test-util = ["janus_core/test-util", "dep:lazy_static", "dep:testcontainers"] [dependencies] anyhow = "1" -assert_matches = "1" atty = "0.2" base64 = "0.13.0" bytes = "1.1.0" @@ -30,6 +29,8 @@ http = "0.2.8" hyper = "0.14.19" itertools = "0.10.3" janus_core = { path = "../janus_core", features = ["database"] } +k8s-openapi = { version = "*", features = ["v1_20"] } +kube = "0.65.0" lazy_static = { version = "1", optional = true } num_enum = "0.5.6" opentelemetry = { version = "0.17.0", features = ["metrics", "rt-tokio"] } @@ -63,6 +64,7 @@ uuid = { version = "1.1.2", features = ["v4"] } warp = { version = "^0.3", features = ["tls"] } [dev-dependencies] +assert_matches = "1" hex = { version = "0.4.3", features = ["serde"] } hyper = "0.14.19" janus_server = { path = ".", features = ["test-util"] } diff --git a/janus_server/src/bin/janus_cli.rs b/janus_server/src/bin/janus_cli.rs index 9570b61e2..02a01b83b 100644 --- a/janus_server/src/bin/janus_cli.rs +++ b/janus_server/src/bin/janus_cli.rs @@ -1,4 +1,5 @@ use anyhow::{Context, Result}; +use base64::STANDARD_NO_PAD; use deadpool_postgres::Pool; use janus_core::time::{Clock, RealClock}; use janus_server::{ @@ -7,8 +8,13 @@ use janus_server::{ datastore::{self, Datastore}, task::Task, }; +use k8s_openapi::api::core::v1::Secret; +use kube::api::{ObjectMeta, PostParams}; +use rand::{thread_rng, Rng}; +use ring::aead::AES_128_GCM; use serde::{Deserialize, Serialize}; use std::{ + collections::BTreeMap, path::{Path, PathBuf}, sync::Arc, }; @@ -71,10 +77,45 @@ async fn provision_tasks(datastore: &Datastore, tasks_file: &Path) .context("couldn't write tasks") } +async fn create_datastore_key( + kube_client: kube::Client, + k8s_namespace: &str, + k8s_secret_name: &str, +) -> Result<()> { + let secrets_api: kube::Api = kube::Api::namespaced(kube_client, k8s_namespace); + + // Generate a random datastore key & encode it into unpadded base64 as will be expected by + // consumers of the secret we are about to write. + let mut key_bytes = vec![0u8; AES_128_GCM.key_len()]; + thread_rng().fill(&mut key_bytes[..]); + let secret_content = base64::encode_config(&key_bytes, STANDARD_NO_PAD); + + // Write the secret. + secrets_api + .create( + &PostParams::default(), + &Secret { + metadata: ObjectMeta { + namespace: Some(k8s_namespace.to_string()), + name: Some(k8s_secret_name.to_string()), + ..ObjectMeta::default() + }, + string_data: Some(BTreeMap::from([( + "datastore_key".to_string(), + secret_content, + )])), + ..Secret::default() + }, + ) + .await + .context("couldn't write datastore key secret")?; + Ok(()) +} + #[derive(Debug, StructOpt)] #[structopt( - name = "janus-provision-task", - about = "Janus `provision task` command", + name = "janus_cli", + about = "Janus CLI tool", rename_all = "kebab-case", version = env!("CARGO_PKG_VERSION"), )] @@ -94,12 +135,24 @@ impl BinaryOptions for Options { #[derive(Debug, StructOpt)] enum Command { + /// Write the Janus database schema to the database. WriteSchema, + + /// Write a set of tasks identified in a file to the datastore. ProvisionTasks { /// A YAML file containing a list of tasks to be written. Existing tasks (matching by task /// ID) will be overwritten. tasks_file: PathBuf, }, + + /// Create a datastore key and write it to a Kubernetes secret. + CreateDatastoreKey { + /// The Kubernetes namespace to create the datastore key secret in. + k8s_namespace: String, + + /// The name of the Kubernetes secret to place the datastore key in. + k8s_secret_name: String, + }, } impl Command { @@ -109,6 +162,19 @@ impl Command { Command::ProvisionTasks { tasks_file } => { provision_tasks(&ctx.datastore, tasks_file).await } + Command::CreateDatastoreKey { + k8s_namespace, + k8s_secret_name, + } => { + create_datastore_key( + kube::Client::try_default() + .await + .context("couldn't connect to Kubernetes environment")?, + k8s_namespace, + k8s_secret_name, + ) + .await + } } } } @@ -128,9 +194,11 @@ impl BinaryConfig for Config { #[cfg(test)] mod tests { use super::Config; + use base64::STANDARD_NO_PAD; use janus_core::{ message::{Role, TaskId}, task::VdafInstance, + test_util::kubernetes, time::RealClock, }; use janus_server::{ @@ -141,6 +209,8 @@ mod tests { datastore::test_util::{ephemeral_datastore, ephemeral_db_handle}, task::test_util::new_dummy_task, }; + use k8s_openapi::api::core::v1::Secret; + use ring::aead::{UnboundKey, AES_128_GCM}; use std::{collections::HashMap, io::Write}; use tempfile::NamedTempFile; @@ -202,6 +272,28 @@ mod tests { assert_eq!(want_tasks, got_tasks); } + #[tokio::test] + async fn create_datastore_key() { + let k8s_cluster = kubernetes::EphemeralCluster::create(); + let kube_client = k8s_cluster.client().await; + + // Create a datastore key. + const NAMESPACE: &str = "default"; + const SECRET_NAME: &str = "secret-name"; + super::create_datastore_key(kube_client.clone(), NAMESPACE, SECRET_NAME) + .await + .unwrap(); + + // Verify that the secret was created. + let secrets_api: kube::Api = kube::Api::namespaced(kube_client, NAMESPACE); + let secret = secrets_api.get(SECRET_NAME).await.unwrap(); + let secret_data = secret.data.unwrap().get("datastore_key").unwrap().clone(); + + // Verify that the written secret data can be parsed as a datastore key. + let datastore_key_bytes = base64::decode_config(&secret_data.0, STANDARD_NO_PAD).unwrap(); + UnboundKey::new(&AES_128_GCM, &datastore_key_bytes).unwrap(); + } + #[test] fn roundtrip_config() { roundtrip_encoding(Config { From 90f703ee90cdca40a3e6cae654bd61c8a8087bcc Mon Sep 17 00:00:00 2001 From: Brandon Pitman Date: Fri, 15 Jul 2022 11:46:46 -0700 Subject: [PATCH 3/7] Update Dockerfile to install openssl dev dependencies. Unfortunately, kube doesn't work with rustls when talking to local clusters: https://github.com/kube-rs/kube-rs/issues/153 Thankfully, openssl is only a dependency if the test-utils feature is enabled, so our release builds will not depend on openssl. Unfortunately, that means that our release builds will be interacting with Kubernetes using a different TLS library than our tests use. --- Cargo.lock | 68 ++++++++++++++++++++++++++++++++++++++++- Dockerfile | 2 +- janus_server/Cargo.toml | 7 +++-- 3 files changed, 73 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2b4631ba9..2c0db06af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1229,6 +1229,24 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-openssl" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6ee5d7a8f718585d1c3c61dfde28ef5b0bb14734b4db13f5ada856cdc6c612b" +dependencies = [ + "http", + "hyper", + "linked_hash_set", + "once_cell", + "openssl", + "openssl-sys", + "parking_lot", + "tokio", + "tokio-openssl", + "tower-layer", +] + [[package]] name = "hyper-rustls" version = "0.23.0" @@ -1237,7 +1255,9 @@ checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" dependencies = [ "http", "hyper", + "log", "rustls 0.20.4", + "rustls-native-certs", "tokio", "tokio-rustls 0.23.3", ] @@ -1510,6 +1530,8 @@ dependencies = [ "http", "http-body", "hyper", + "hyper-openssl", + "hyper-rustls", "hyper-timeout", "hyper-tls", "jsonpath_lib", @@ -1518,6 +1540,8 @@ dependencies = [ "openssl", "pem", "pin-project", + "rustls 0.20.4", + "rustls-pemfile 0.2.1", "serde", "serde_json", "serde_yaml", @@ -1564,6 +1588,15 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" +[[package]] +name = "linked_hash_set" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47186c6da4d81ca383c7c47c1bfc80f4b95f4720514d860a5407aaf4233f9588" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "lock_api" version = "0.4.7" @@ -2522,7 +2555,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls 0.20.4", - "rustls-pemfile", + "rustls-pemfile 1.0.0", "serde", "serde_json", "serde_urlencoded", @@ -2583,6 +2616,27 @@ dependencies = [ "webpki 0.22.0", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50" +dependencies = [ + "openssl-probe", + "rustls-pemfile 1.0.0", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" +dependencies = [ + "base64", +] + [[package]] name = "rustls-pemfile" version = "1.0.0" @@ -3196,6 +3250,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-openssl" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08f9ffb7809f1b20c1b398d92acf4cc719874b3b2b2d9ea2f09b4a80350878a" +dependencies = [ + "futures-util", + "openssl", + "openssl-sys", + "tokio", +] + [[package]] name = "tokio-postgres" version = "0.7.6" diff --git a/Dockerfile b/Dockerfile index e808c7262..36c01ef6f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM rust:1.62.0-alpine as builder ARG BINARY=aggregator -RUN apk add libc-dev +RUN apk add libc-dev openssl-dev WORKDIR /src COPY Cargo.toml /src/Cargo.toml diff --git a/janus_server/Cargo.toml b/janus_server/Cargo.toml index 3c5adc559..efab53da8 100644 --- a/janus_server/Cargo.toml +++ b/janus_server/Cargo.toml @@ -7,11 +7,14 @@ publish = false rust-version = "1.60" [features] +default = ["kube-rustls"] tokio-console = ["dep:console-subscriber"] jaeger = ["dep:tracing-opentelemetry", "dep:opentelemetry-jaeger"] otlp = ["dep:tracing-opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry-semantic-conventions", "dep:tonic"] prometheus = ["dep:opentelemetry-prometheus", "dep:prometheus"] test-util = ["janus_core/test-util", "dep:lazy_static", "dep:testcontainers"] +kube-rustls = ["kube/rustls-tls"] +kube-openssl = ["kube/openssl-tls"] [dependencies] anyhow = "1" @@ -30,7 +33,7 @@ hyper = "0.14.19" itertools = "0.10.3" janus_core = { path = "../janus_core", features = ["database"] } k8s-openapi = { version = "*", features = ["v1_20"] } -kube = "0.65.0" +kube = { version = "0.65.0", default-features = false, features = ["client"] } lazy_static = { version = "1", optional = true } num_enum = "0.5.6" opentelemetry = { version = "0.17.0", features = ["metrics", "rt-tokio"] } @@ -67,7 +70,7 @@ warp = { version = "^0.3", features = ["tls"] } assert_matches = "1" hex = { version = "0.4.3", features = ["serde"] } hyper = "0.14.19" -janus_server = { path = ".", features = ["test-util"] } +janus_server = { path = ".", default-features = false, features = ["kube-openssl", "test-util"] } libc = "0.2.126" mockito = "0.31.0" serde_test = "1.0.139" From 162fc88d42067c7aeeb3dae9aefc50d43f81797d Mon Sep 17 00:00:00 2001 From: Brandon Pitman Date: Fri, 15 Jul 2022 12:35:05 -0700 Subject: [PATCH 4/7] Use a temporary kubeconfig file for ephemeral K8S clusters. --- Cargo.lock | 1 + janus_core/Cargo.toml | 4 +- janus_core/src/test_util/kubernetes.rs | 54 ++++++++++++++++++++++---- 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2c0db06af..06dc2c6a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1398,6 +1398,7 @@ dependencies = [ "ring", "serde", "serde_json", + "tempfile", "thiserror", "tokio", "tracing", diff --git a/janus_core/Cargo.toml b/janus_core/Cargo.toml index 70b428b89..21fb258b0 100644 --- a/janus_core/Cargo.toml +++ b/janus_core/Cargo.toml @@ -16,6 +16,7 @@ test-util = [ "dep:futures", "dep:kube", "dep:serde_json", + "dep:tempfile", "dep:tracing", "dep:tracing-log", "dep:tracing-subscriber", @@ -45,6 +46,7 @@ tokio = { version = "^1.19", features = ["rt"] } assert_matches = { version = "1", optional = true } serde_json = { version = "1.0.82", optional = true } futures = { version = "0.3.21", optional = true } +tempfile = { version = "3", optional = true } tracing = { version = "0.1.34", optional = true } tracing-log = { version = "0.1.3", optional = true } -tracing-subscriber = { version = "0.3", features = ["std", "env-filter", "fmt"], optional = true } +tracing-subscriber = { version = "0.3", features = ["std", "env-filter", "fmt"], optional = true } \ No newline at end of file diff --git a/janus_core/src/test_util/kubernetes.rs b/janus_core/src/test_util/kubernetes.rs index f6d896b3e..eac831e91 100644 --- a/janus_core/src/test_util/kubernetes.rs +++ b/janus_core/src/test_util/kubernetes.rs @@ -1,18 +1,23 @@ //! Testing framework for functionality that interacts with Kubernetes. -use kube::config::KubeConfigOptions; +use kube::config::{KubeConfigOptions, Kubeconfig}; use rand::{thread_rng, Rng}; use std::process::{Command, Stdio}; +use tempfile::{NamedTempFile, TempPath}; /// EphemeralCluster represents a running ephemeral Kubernetes cluster for testing. Dropping an /// EphemeralCluster will cause the associated Kubernetes cluster to be stopped & cleaned up. pub struct EphemeralCluster { name: String, + kubeconfig_path: TempPath, } impl EphemeralCluster { /// Creates & starts a new ephemeral Kubernetes cluster. pub fn create() -> Self { + // Choose a temporary file location for our kube config. + let kubeconfig_path = NamedTempFile::new().unwrap().into_temp_path(); + // Choose a cluster name. let mut randomness = [0u8; 4]; thread_rng().fill(&mut randomness); @@ -20,7 +25,14 @@ impl EphemeralCluster { // Use kind to start the cluster. assert!(Command::new("kind") - .args(["create", "cluster", "--name", &cluster_name]) + .args([ + "create", + "cluster", + "--kubeconfig", + &kubeconfig_path.to_string_lossy(), + "--name", + &cluster_name, + ]) .stdin(Stdio::null()) .stdout(Stdio::null()) .stderr(Stdio::null()) @@ -28,16 +40,22 @@ impl EphemeralCluster { .unwrap() .success()); - Self { name: cluster_name } + Self { + name: cluster_name, + kubeconfig_path, + } } /// Returns a new [`kube::Client`] configured to interact with this Kubernetes cluster. pub async fn client(&self) -> kube::Client { kube::Client::try_from( - kube::Config::from_kubeconfig(&KubeConfigOptions { - context: Some(format!("kind-{}", self.name)), - ..KubeConfigOptions::default() - }) + kube::Config::from_custom_kubeconfig( + Kubeconfig::read_from(&self.kubeconfig_path).unwrap(), + &KubeConfigOptions { + context: Some(format!("kind-{}", self.name)), + ..KubeConfigOptions::default() + }, + ) .await .unwrap(), ) @@ -49,7 +67,14 @@ impl Drop for EphemeralCluster { fn drop(&mut self) { // Delete the cluster that was created when we created the EphemeralCluster. assert!(Command::new("kind") - .args(["delete", "cluster", "--name", &self.name]) + .args([ + "delete", + "cluster", + "--kubeconfig", + &self.kubeconfig_path.to_string_lossy(), + "--name", + &self.name, + ]) .stdin(Stdio::null()) .stdout(Stdio::null()) .stderr(Stdio::null()) @@ -58,3 +83,16 @@ impl Drop for EphemeralCluster { .success()) } } + +#[cfg(test)] +mod tests { + use super::EphemeralCluster; + + #[test] + fn create_clusters() { + // Create a couple of clusters, then drop them, to test that creating multiple clusters + // does not lead to collisions in some namespace. + let _first_cluster = EphemeralCluster::create(); + let _second_cluster = EphemeralCluster::create(); + } +} From 0abd932e198eb0e0c432b42aa8b207cd1dc4e5f9 Mon Sep 17 00:00:00 2001 From: Brandon Pitman Date: Fri, 15 Jul 2022 12:50:53 -0700 Subject: [PATCH 5/7] Remove openssl dependency from Dockerfile. It's no longer necessary since the crate features are set up to (rightfully) not include openssl in release builds. --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 36c01ef6f..e808c7262 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM rust:1.62.0-alpine as builder ARG BINARY=aggregator -RUN apk add libc-dev openssl-dev +RUN apk add libc-dev WORKDIR /src COPY Cargo.toml /src/Cargo.toml From c7a42e1b39c42d3510505e5d5b6f28e38d057a02 Mon Sep 17 00:00:00 2001 From: Brandon Pitman Date: Fri, 15 Jul 2022 13:53:49 -0700 Subject: [PATCH 6/7] Add log line for creating datastore key. --- janus_server/src/bin/janus_cli.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/janus_server/src/bin/janus_cli.rs b/janus_server/src/bin/janus_cli.rs index 02a01b83b..4a85381f1 100644 --- a/janus_server/src/bin/janus_cli.rs +++ b/janus_server/src/bin/janus_cli.rs @@ -82,6 +82,7 @@ async fn create_datastore_key( k8s_namespace: &str, k8s_secret_name: &str, ) -> Result<()> { + info!("Creating datastore key"); let secrets_api: kube::Api = kube::Api::namespaced(kube_client, k8s_namespace); // Generate a random datastore key & encode it into unpadded base64 as will be expected by From bd255daa3ed794d5aa3c46b1f7fa0ffc40bbd426 Mon Sep 17 00:00:00 2001 From: Brandon Pitman Date: Fri, 15 Jul 2022 13:57:50 -0700 Subject: [PATCH 7/7] Note `kind` dependency for tests in the README. Also, update a stale reference to "PPM" as the protocol name to "DAP". --- README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5d6382fda..c13a8a56c 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [actions]: https://github.com/divviup/janus/actions?query=branch%3Amain Janus is an experimental implementation of the -[Privacy Preserving Measurement (PPM) specification](https://github.com/ietf-wg-ppm/draft-ietf-ppm-dap). +[Distributed Aggregation Protocol (DAP) specification](https://github.com/ietf-wg-ppm/draft-ietf-ppm-dap). It is currently in active development. @@ -19,7 +19,11 @@ aggregator --config-file --role ## Running tests -To run janus tests, ensure docker is running locally and execute `cargo test`. +Tests require that [docker](https://www.docker.com) & [kind](https://kind.sigs.k8s.io) be installed +on the machine running the tests and in the `PATH` of the test-runner's environment. The docker +daemon must be running. + +To run janus tests, execute `cargo test`. ## Container image