Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename provision_task to janus_cli, add create-datastore-key command. #314

Merged
merged 8 commits into from Jul 18, 2022
444 changes: 436 additions & 8 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions README.md
Expand Up @@ -5,7 +5,7 @@
[actions]: https://github.com/divviup/janus/actions?query=branch%3Amain

Janus is an experimental implementation of the
[Privacy Preserving Measurement (PPM) specification](https://github.com/ietf-wg-ppm/draft-ietf-ppm-dap).
[Distributed Aggregation Protocol (DAP) specification](https://github.com/ietf-wg-ppm/draft-ietf-ppm-dap).

It is currently in active development.

Expand All @@ -19,7 +19,11 @@ aggregator --config-file <config-file> --role <role>

## Running tests

To run janus tests, ensure docker is running locally and execute `cargo test`.
Tests require that [docker](https://www.docker.com) & [kind](https://kind.sigs.k8s.io) be installed
on the machine running the tests and in the `PATH` of the test-runner's environment. The docker
daemon must be running.

To run janus tests, execute `cargo test`.

## Container image

Expand Down
8 changes: 6 additions & 2 deletions janus_core/Cargo.toml
Expand Up @@ -13,8 +13,10 @@ rust-version = "1.60"
database = ["dep:bytes", "dep:postgres-protocol", "dep:postgres-types"]
test-util = [
"dep:assert_matches",
"dep:serde_json",
"dep:futures",
"dep:kube",
"dep:serde_json",
"dep:tempfile",
"dep:tracing",
"dep:tracing-log",
"dep:tracing-subscriber",
Expand All @@ -29,6 +31,7 @@ bytes = { version = "1.1.0", optional = true }
chrono = "0.4"
hex = "0.4.3"
hpke-dispatch = "0.3.0"
kube = { version = "0.65.0", optional = true }
num_enum = "0.5.6"
postgres-protocol = { version = "0.6.4", optional = true }
postgres-types = { version = "0.2.3", optional = true }
Expand All @@ -43,6 +46,7 @@ tokio = { version = "^1.19", features = ["rt"] }
assert_matches = { version = "1", optional = true }
serde_json = { version = "1.0.82", optional = true }
futures = { version = "0.3.21", optional = true }
tempfile = { version = "3", optional = true }
tracing = { version = "0.1.34", optional = true }
tracing-log = { version = "0.1.3", optional = true }
tracing-subscriber = { version = "0.3", features = ["std", "env-filter", "fmt"], optional = true }
tracing-subscriber = { version = "0.3", features = ["std", "env-filter", "fmt"], optional = true }
98 changes: 98 additions & 0 deletions janus_core/src/test_util/kubernetes.rs
@@ -0,0 +1,98 @@
//! Testing framework for functionality that interacts with Kubernetes.

use kube::config::{KubeConfigOptions, Kubeconfig};
use rand::{thread_rng, Rng};
use std::process::{Command, Stdio};
use tempfile::{NamedTempFile, TempPath};

/// EphemeralCluster represents a running ephemeral Kubernetes cluster for testing. Dropping an
/// EphemeralCluster will cause the associated Kubernetes cluster to be stopped & cleaned up.
pub struct EphemeralCluster {
name: String,
kubeconfig_path: TempPath,
}

impl EphemeralCluster {
/// Creates & starts a new ephemeral Kubernetes cluster.
pub fn create() -> Self {
// Choose a temporary file location for our kube config.
let kubeconfig_path = NamedTempFile::new().unwrap().into_temp_path();

// Choose a cluster name.
let mut randomness = [0u8; 4];
thread_rng().fill(&mut randomness);
let cluster_name = format!("janus-ephemeral-{}", hex::encode(&randomness));

// Use kind to start the cluster.
assert!(Command::new("kind")
.args([
"create",
"cluster",
"--kubeconfig",
&kubeconfig_path.to_string_lossy(),
"--name",
&cluster_name,
])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.unwrap()
.success());

Self {
name: cluster_name,
kubeconfig_path,
}
}

/// Returns a new [`kube::Client`] configured to interact with this Kubernetes cluster.
pub async fn client(&self) -> kube::Client {
kube::Client::try_from(
kube::Config::from_custom_kubeconfig(
Kubeconfig::read_from(&self.kubeconfig_path).unwrap(),
&KubeConfigOptions {
context: Some(format!("kind-{}", self.name)),
..KubeConfigOptions::default()
},
)
.await
.unwrap(),
)
.unwrap()
}
}

impl Drop for EphemeralCluster {
fn drop(&mut self) {
// Delete the cluster that was created when we created the EphemeralCluster.
assert!(Command::new("kind")
.args([
"delete",
"cluster",
"--kubeconfig",
&self.kubeconfig_path.to_string_lossy(),
"--name",
&self.name,
])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.unwrap()
.success())
}
}

#[cfg(test)]
mod tests {
use super::EphemeralCluster;

#[test]
fn create_clusters() {
// Create a couple of clusters, then drop them, to test that creating multiple clusters
// does not lead to collisions in some namespace.
let _first_cluster = EphemeralCluster::create();
let _second_cluster = EphemeralCluster::create();
}
}
1 change: 1 addition & 0 deletions janus_core/src/test_util/mod.rs
Expand Up @@ -9,6 +9,7 @@ use tracing_log::LogTracer;
use tracing_subscriber::{prelude::*, EnvFilter, Registry};

pub mod dummy_vdaf;
pub mod kubernetes;
pub mod runtime;

/// A transcript of a VDAF run. All fields are indexed by natural role index (i.e., index 0 =
Expand Down
7 changes: 6 additions & 1 deletion janus_server/Cargo.toml
Expand Up @@ -7,11 +7,14 @@ publish = false
rust-version = "1.60"

[features]
default = ["kube-rustls"]
tokio-console = ["dep:console-subscriber"]
jaeger = ["dep:tracing-opentelemetry", "dep:opentelemetry-jaeger"]
otlp = ["dep:tracing-opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry-semantic-conventions", "dep:tonic"]
prometheus = ["dep:opentelemetry-prometheus", "dep:prometheus"]
test-util = ["janus_core/test-util", "dep:lazy_static", "dep:testcontainers"]
kube-rustls = ["kube/rustls-tls"]
kube-openssl = ["kube/openssl-tls"]

[dependencies]
anyhow = "1"
Expand All @@ -29,6 +32,8 @@ http = "0.2.8"
hyper = "0.14.20"
itertools = "0.10.3"
janus_core = { path = "../janus_core", features = ["database"] }
k8s-openapi = { version = "*", features = ["v1_20"] }
kube = { version = "0.65.0", default-features = false, features = ["client"] }
lazy_static = { version = "1", optional = true }
num_enum = "0.5.6"
opentelemetry = { version = "0.17.0", features = ["metrics", "rt-tokio"] }
Expand Down Expand Up @@ -65,7 +70,7 @@ warp = { version = "^0.3", features = ["tls"] }
assert_matches = "1"
hex = { version = "0.4.3", features = ["serde"] }
hyper = "0.14.20"
janus_server = { path = ".", features = ["test-util"] }
janus_server = { path = ".", default-features = false, features = ["kube-openssl", "test-util"] }
libc = "0.2.126"
mockito = "0.31.0"
serde_test = "1.0.139"
Expand Down
2 changes: 1 addition & 1 deletion janus_server/src/bin/aggregation_job_creator.rs
Expand Up @@ -9,7 +9,7 @@ use structopt::StructOpt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
janus_main::<Options, _, Config, _, _>(RealClock::default(), |ctx| async move {
janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move {
// Start creating aggregation jobs.
Arc::new(AggregationJobCreator::new(
ctx.datastore,
Expand Down
2 changes: 1 addition & 1 deletion janus_server/src/bin/aggregation_job_driver.rs
Expand Up @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> {
"/aggregation_job_driver",
);

janus_main::<Options, _, Config, _, _>(RealClock::default(), |ctx| async move {
janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move {
let meter = opentelemetry::global::meter("aggregation_job_driver");
let datastore = Arc::new(ctx.datastore);
let aggregation_job_driver = Arc::new(AggregationJobDriver::new(
Expand Down
2 changes: 1 addition & 1 deletion janus_server/src/bin/aggregator.rs
Expand Up @@ -13,7 +13,7 @@ use tracing::info;

#[tokio::main]
async fn main() -> Result<()> {
janus_main::<Options, _, Config, _, _>(RealClock::default(), |ctx| async move {
janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move {
let shutdown_signal =
setup_signal_handler().context("failed to register SIGTERM signal handler")?;

Expand Down
2 changes: 1 addition & 1 deletion janus_server/src/bin/collect_job_driver.rs
Expand Up @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> {
"/collect_job_driver"
);

janus_main::<Options, _, Config, _, _>(RealClock::default(), |ctx| async move {
janus_main::<_, Options, Config, _, _>(RealClock::default(), |ctx| async move {
let meter = opentelemetry::global::meter("collect_job_driver");
let datastore = Arc::new(ctx.datastore);
let collect_job_driver = Arc::new(CollectJobDriver::new(
Expand Down