Skip to content

Commit

Permalink
Rename provision_task to janus_cli, add create-datastore-key co…
Browse files Browse the repository at this point in the history
…mmand. (#314)

The testing is done via a new library used to spin up an ephemeral
Kubernetes cluster using `kind`. It interacts with `kind` by shelling
out; I'm not happy about this, but I can't find an equivalent library
for Rust.
  • Loading branch information
branlwyd committed Jul 18, 2022
1 parent dee87c4 commit 7beee2a
Show file tree
Hide file tree
Showing 14 changed files with 874 additions and 241 deletions.
444 changes: 436 additions & 8 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions README.md
Expand Up @@ -5,7 +5,7 @@
[actions]: https://github.com/divviup/janus/actions?query=branch%3Amain

Janus is an experimental implementation of the
[Privacy Preserving Measurement (PPM) specification](https://github.com/ietf-wg-ppm/draft-ietf-ppm-dap).
[Distributed Aggregation Protocol (DAP) specification](https://github.com/ietf-wg-ppm/draft-ietf-ppm-dap).

It is currently in active development.

Expand All @@ -19,7 +19,11 @@ aggregator --config-file <config-file> --role <role>

## Running tests

To run janus tests, ensure docker is running locally and execute `cargo test`.
Tests require that [docker](https://www.docker.com) & [kind](https://kind.sigs.k8s.io) be installed
on the machine running the tests and in the `PATH` of the test-runner's environment. The docker
daemon must be running.

To run janus tests, execute `cargo test`.

## Container image

Expand Down
8 changes: 6 additions & 2 deletions janus_core/Cargo.toml
Expand Up @@ -13,8 +13,10 @@ rust-version = "1.60"
database = ["dep:bytes", "dep:postgres-protocol", "dep:postgres-types"]
test-util = [
"dep:assert_matches",
"dep:serde_json",
"dep:futures",
"dep:kube",
"dep:serde_json",
"dep:tempfile",
"dep:tracing",
"dep:tracing-log",
"dep:tracing-subscriber",
Expand All @@ -29,6 +31,7 @@ bytes = { version = "1.1.0", optional = true }
chrono = "0.4"
hex = "0.4.3"
hpke-dispatch = "0.3.0"
kube = { version = "0.65.0", optional = true }
num_enum = "0.5.6"
postgres-protocol = { version = "0.6.4", optional = true }
postgres-types = { version = "0.2.3", optional = true }
Expand All @@ -43,6 +46,7 @@ tokio = { version = "^1.19", features = ["rt"] }
assert_matches = { version = "1", optional = true }
serde_json = { version = "1.0.82", optional = true }
futures = { version = "0.3.21", optional = true }
tempfile = { version = "3", optional = true }
tracing = { version = "0.1.34", optional = true }
tracing-log = { version = "0.1.3", optional = true }
tracing-subscriber = { version = "0.3", features = ["std", "env-filter", "fmt"], optional = true }
tracing-subscriber = { version = "0.3", features = ["std", "env-filter", "fmt"], optional = true }
98 changes: 98 additions & 0 deletions janus_core/src/test_util/kubernetes.rs
@@ -0,0 +1,98 @@
//! Testing framework for functionality that interacts with Kubernetes.

use kube::config::{KubeConfigOptions, Kubeconfig};
use rand::{thread_rng, Rng};
use std::process::{Command, Stdio};
use tempfile::{NamedTempFile, TempPath};

/// EphemeralCluster represents a running ephemeral Kubernetes cluster for testing. Dropping an
/// EphemeralCluster will cause the associated Kubernetes cluster to be stopped & cleaned up.
pub struct EphemeralCluster {
name: String,
kubeconfig_path: TempPath,
}

impl EphemeralCluster {
/// Creates & starts a new ephemeral Kubernetes cluster.
pub fn create() -> Self {
// Choose a temporary file location for our kube config.
let kubeconfig_path = NamedTempFile::new().unwrap().into_temp_path();

// Choose a cluster name.
let mut randomness = [0u8; 4];
thread_rng().fill(&mut randomness);
let cluster_name = format!("janus-ephemeral-{}", hex::encode(&randomness));

// Use kind to start the cluster.
assert!(Command::new("kind")
.args([
"create",
"cluster",
"--kubeconfig",
&kubeconfig_path.to_string_lossy(),
"--name",
&cluster_name,
])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.unwrap()
.success());

Self {
name: cluster_name,
kubeconfig_path,
}
}

/// Returns a new [`kube::Client`] configured to interact with this Kubernetes cluster.
pub async fn client(&self) -> kube::Client {
kube::Client::try_from(
kube::Config::from_custom_kubeconfig(
Kubeconfig::read_from(&self.kubeconfig_path).unwrap(),
&KubeConfigOptions {
context: Some(format!("kind-{}", self.name)),
..KubeConfigOptions::default()
},
)
.await
.unwrap(),
)
.unwrap()
}
}

impl Drop for EphemeralCluster {
fn drop(&mut self) {
// Delete the cluster that was created when we created the EphemeralCluster.
assert!(Command::new("kind")
.args([
"delete",
"cluster",
"--kubeconfig",
&self.kubeconfig_path.to_string_lossy(),
"--name",
&self.name,
])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.unwrap()
.success())
}
}

#[cfg(test)]
mod tests {
use super::EphemeralCluster;

#[test]
fn create_clusters() {
// Create a couple of clusters, then drop them, to test that creating multiple clusters
// does not lead to collisions in some namespace.
let _first_cluster = EphemeralCluster::create();
let _second_cluster = EphemeralCluster::create();
}
}
1 change: 1 addition & 0 deletions janus_core/src/test_util/mod.rs
Expand Up @@ -9,6 +9,7 @@ use tracing_log::LogTracer;
use tracing_subscriber::{prelude::*, EnvFilter, Registry};

pub mod dummy_vdaf;
pub mod kubernetes;
pub mod runtime;

/// A transcript of a VDAF run. All fields are indexed by natural role index (i.e., index 0 =
Expand Down
7 changes: 6 additions & 1 deletion janus_server/Cargo.toml
Expand Up @@ -7,11 +7,14 @@ publish = false
rust-version = "1.60"

[features]
default = ["kube-rustls"]
tokio-console = ["dep:console-subscriber"]
jaeger = ["dep:tracing-opentelemetry", "dep:opentelemetry-jaeger"]
otlp = ["dep:tracing-opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry-semantic-conventions", "dep:tonic"]
prometheus = ["dep:opentelemetry-prometheus", "dep:prometheus"]
test-util = ["janus_core/test-util", "dep:lazy_static", "dep:testcontainers"]
kube-rustls = ["kube/rustls-tls"]
kube-openssl = ["kube/openssl-tls"]

[dependencies]
anyhow = "1"
Expand All @@ -29,6 +32,8 @@ http = "0.2.8"
hyper = "0.14.20"
itertools = "0.10.3"
janus_core = { path = "../janus_core", features = ["database"] }
k8s-openapi = { version = "*", features = ["v1_20"] }
kube = { version = "0.65.0", default-features = false, features = ["client"] }
lazy_static = { version = "1", optional = true }
num_enum = "0.5.6"
opentelemetry = { version = "0.17.0", features = ["metrics", "rt-tokio"] }
Expand Down Expand Up @@ -65,7 +70,7 @@ warp = { version = "^0.3", features = ["tls"] }
assert_matches = "1"
hex = { version = "0.4.3", features = ["serde"] }
hyper = "0.14.20"
janus_server = { path = ".", features = ["test-util"] }
janus_server = { path = ".", default-features = false, features = ["kube-openssl", "test-util"] }
libc = "0.2.126"
mockito = "0.31.0"
serde_test = "1.0.139"
Expand Down
2 changes: 1 addition & 1 deletion janus_server/src/bin/aggregation_job_creator.rs
Expand Up @@ -9,7 +9,7 @@ use structopt::StructOpt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
janus_main::<Options, _, Config, _, _>(RealClock::default(), |ctx| async move {
janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move {
// Start creating aggregation jobs.
Arc::new(AggregationJobCreator::new(
ctx.datastore,
Expand Down
2 changes: 1 addition & 1 deletion janus_server/src/bin/aggregation_job_driver.rs
Expand Up @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> {
"/aggregation_job_driver",
);

janus_main::<Options, _, Config, _, _>(RealClock::default(), |ctx| async move {
janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move {
let meter = opentelemetry::global::meter("aggregation_job_driver");
let datastore = Arc::new(ctx.datastore);
let aggregation_job_driver = Arc::new(AggregationJobDriver::new(
Expand Down
2 changes: 1 addition & 1 deletion janus_server/src/bin/aggregator.rs
Expand Up @@ -13,7 +13,7 @@ use tracing::info;

#[tokio::main]
async fn main() -> Result<()> {
janus_main::<Options, _, Config, _, _>(RealClock::default(), |ctx| async move {
janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move {
let shutdown_signal =
setup_signal_handler().context("failed to register SIGTERM signal handler")?;

Expand Down
2 changes: 1 addition & 1 deletion janus_server/src/bin/collect_job_driver.rs
Expand Up @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> {
"/collect_job_driver"
);

janus_main::<Options, _, Config, _, _>(RealClock::default(), |ctx| async move {
janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move {
let meter = opentelemetry::global::meter("collect_job_driver");
let datastore = Arc::new(ctx.datastore);
let collect_job_driver = Arc::new(CollectJobDriver::new(
Expand Down

0 comments on commit 7beee2a

Please sign in to comment.