forked from kube-rs/kube
-
Notifications
You must be signed in to change notification settings - Fork 0
/
job_api.rs
80 lines (72 loc) · 2.47 KB
/
job_api.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#[macro_use] extern crate log;
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::batch::v1::Job;
use serde_json::json;
use kube::{
api::{Api, DeleteParams, ListParams, PostParams, ResourceExt, WatchEvent},
Client,
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
env_logger::init();
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or("default".into());
// Create a Job
let job_name = "empty-job";
let my_job = serde_json::from_value(json!({
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": job_name,
},
"spec": {
"template": {
"metadata": {
"name": "empty-job-pod"
},
"spec": {
"containers": [{
"name": "empty",
"image": "alpine:latest"
}],
"restartPolicy": "Never",
}
}
}
}))?;
let jobs: Api<Job> = Api::namespaced(client, &namespace);
let pp = PostParams::default();
jobs.create(&pp, &my_job).await?;
// See if it ran to completion
let lp = ListParams::default()
.fields(&format!("metadata.name={}", job_name)) // only want events for our job
.timeout(20); // should be done by then
let mut stream = jobs.watch(&lp, "").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(s) => info!("Added {}", s.name()),
WatchEvent::Modified(s) => {
let current_status = s.status.clone().expect("Status is missing");
match current_status.completion_time {
Some(_) => {
info!("Modified: {} is complete", s.name());
break;
}
_ => info!("Modified: {} is running", s.name()),
}
}
WatchEvent::Deleted(s) => info!("Deleted {}", s.name()),
WatchEvent::Error(s) => error!("{}", s),
_ => {}
}
}
// Clean up the old job record..
info!("Deleting the job record.");
let mut dp = DeleteParams::default();
dp.dry_run = true;
jobs.delete("empty-job", &dp).await?;
dp.dry_run = false;
jobs.delete("empty-job", &dp).await?;
Ok(())
}