Skip to content

Commit

Permalink
implement resources_by_stability
Browse files Browse the repository at this point in the history
Signed-off-by: chengqinglin <chengqinglin@icloud.com>
  • Loading branch information
imuxin committed Sep 30, 2022
1 parent 780b4df commit 4bdfe02
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 15 deletions.
2 changes: 1 addition & 1 deletion examples/kubectl.rs
Expand Up @@ -64,7 +64,7 @@ fn resolve_api_resource(discovery: &Discovery, name: &str) -> Option<(ApiResourc
.groups()
.flat_map(|group| {
group
.recommended_resources()
.resources_by_stability()
.into_iter()
.map(move |res| (group, res))
})
Expand Down
1 change: 0 additions & 1 deletion kube-client/Cargo.toml
Expand Up @@ -71,7 +71,6 @@ rand = { version = "0.8.3", optional = true }
secrecy = { version = "0.8.0", features = ["alloc", "serde"] }
tracing = { version = "0.1.29", features = ["log"], optional = true }
hyper-openssl = { version = "0.9.2", optional = true }
itertools = "0.10.5"

[dependencies.k8s-openapi]
version = "0.16.0"
Expand Down
45 changes: 37 additions & 8 deletions kube-client/src/discovery/apigroup.rs
@@ -1,13 +1,12 @@
use super::parse::{self, GroupVersionData};
use crate::{error::DiscoveryError, Client, Error, Result};
use itertools::Itertools;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{APIGroup, APIVersions};
pub use kube_core::discovery::{verbs, ApiCapabilities, ApiResource, Scope};
use kube_core::{
gvk::{GroupVersion, GroupVersionKind, ParseGroupVersionError},
Version,
};
use std::cmp::Reverse;
use std::{cmp::Reverse, collections::HashMap, iter::Iterator};

/// Describes one API groups collected resources and capabilities.
///
Expand Down Expand Up @@ -240,12 +239,42 @@ impl ApiGroup {
///
/// This is equivalent to taking the [`ApiGroup::versioned_resources`] at the [`ApiGroup::preferred_version_or_latest`].
pub fn recommended_resources(&self) -> Vec<(ApiResource, ApiCapabilities)> {
self.data
.iter()
.map(|gvd| gvd.resources.clone())
.concat()
.iter()
.into_group_map_by(|(ar, _)| ar.kind.clone())
let ver = self.preferred_version_or_latest();
self.versioned_resources(ver)
}

/// Returns all (including the lost in the lower group version) resources having the most stable version.
///
/// ```no_run
/// use kube::{Client, api::{Api, DynamicObject}, discovery::{self, verbs}, ResourceExt};
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let client = Client::try_default().await?;
/// let apigroup = discovery::group(&client, "apiregistration.k8s.io").await?;
/// for (ar, caps) in apigroup.resources_by_stability() {
/// if !caps.supports_operation(verbs::LIST) {
/// continue;
/// }
/// let api: Api<DynamicObject> = Api::all_with(client.clone(), &ar);
/// for inst in api.list(&Default::default()).await? {
/// println!("Found {}: {}", ar.kind, inst.name());
/// }
/// }
/// Ok(())
/// }
/// ```
/// See an example in [examples/kubectl.rs](https://github.com/kube-rs/kube/blob/main/examples/kubectl.rs)
pub fn resources_by_stability(&self) -> Vec<(ApiResource, ApiCapabilities)> {
let mut lookup = HashMap::new();
self.data.iter().for_each(|gvd| {
gvd.resources.iter().for_each(|resource| {
lookup
.entry(resource.0.kind.clone())
.or_insert_with(Vec::new)
.push(resource);
})
});
lookup
.into_iter()
.map(|(_, mut v)| {
v.sort_by_cached_key(|(ar, _)| Reverse(Version::parse(ar.version.as_str()).priority()));
Expand Down
110 changes: 105 additions & 5 deletions kube/src/lib.rs
Expand Up @@ -247,11 +247,14 @@ mod test {
let foos: Api<Foo> = Api::default_namespaced(client.clone());
// Apply from generated struct
{
let foo = Foo::new("baz", FooSpec {
name: "baz".into(),
info: Some("old baz".into()),
replicas: 1,
});
let foo = Foo::new(
"baz",
FooSpec {
name: "baz".into(),
info: Some("old baz".into()),
replicas: 1,
},
);
let o = foos.patch("baz", &ssapply, &Patch::Apply(&foo)).await?;
assert_eq!(o.spec.name, "baz");
let oref = o.object_ref(&());
Expand Down Expand Up @@ -431,6 +434,103 @@ mod test {
Ok(())
}

#[tokio::test]
#[ignore] // needs cluster (fetches api resources, and lists all)
#[cfg(all(feature = "derive"))]
async fn derived_resources_by_stability_discoverable() -> Result<(), Box<dyn std::error::Error>> {
use crate::{
discovery::{self, Discovery},
runtime::wait::{await_condition, conditions},
};
use kube_core::crd::merge_crds;

mod v1alpha1 {
use super::*;

#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "kube.rs",
version = "v1alpha1",
kind = "TestLowVersionCr",
namespaced
)]
#[kube(crates(kube_core = "crate::core"))] // for dev-dep test structure
pub struct TestLowVersionCrSpec {}
}

mod v1 {
use super::*;

#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(group = "kube.rs", version = "v1", kind = "TestCr", namespaced)]
#[kube(crates(kube_core = "crate::core"))] // for dev-dep test structure
pub struct TestCrSpec {}
}

mod v2alpha1 {
use super::*;

#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(group = "kube.rs", version = "v2alpha1", kind = "TestCr", namespaced)]
#[kube(crates(kube_core = "crate::core"))] // for dev-dep test structure
pub struct TestCrSpec {}
}

async fn apply_crd(
client: Client,
name: &str,
crd: CustomResourceDefinition,
) -> Result<(), Box<dyn std::error::Error>> {
let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
let ssapply = PatchParams::apply("kube").force();
crds.patch(name, &ssapply, &Patch::Apply(&crd)).await?;
let establish = await_condition(crds.clone(), name, conditions::is_crd_established());
let _ = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await?;
Ok(())
}

let client = Client::try_default().await?;
let test_lowversion_crd = v1alpha1::TestLowVersionCr::crd();
let testcrd_v1 = v1::TestCr::crd();
let testcrd_v2alpha1 = v2alpha1::TestCr::crd();
let all_crds = vec![testcrd_v1.clone(), testcrd_v2alpha1.clone()];

apply_crd(client.clone(), "testlowversioncrs.kube.rs", test_lowversion_crd).await?;
apply_crd(client.clone(), "testcrs.kube.rs", merge_crds(all_crds, "v1")?).await?;

// run (almost) full discovery
let discovery = Discovery::new(client.clone())
// skip something in discovery (clux.dev crd being mutated in other tests)
.exclude(&["rbac.authorization.k8s.io", "clux.dev"])
.run()
.await?;

// check our custom resource first by resolving within groups
assert!(discovery.has_group("kube.rs"), "missing group kube.rs");

let group = discovery::group(&client, "kube.rs").await?;
let resources = group.resources_by_stability();
assert!(
resources
.iter()
.any(|(ar, _)| ar.kind == "TestCr" && ar.version == "v1"),
"wrong stable version"
);
assert!(
resources
.iter()
.any(|(ar, _)| ar.kind == "TestLowVersionCr" && ar.version == "v1alpha1"),
"lost low version resource"
);

// cleanup
let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
crds.delete("testcrs.kube.rs", &DeleteParams::default()).await?;
crds.delete("testlowversioncrs.kube.rs", &DeleteParams::default())
.await?;
Ok(())
}

#[tokio::test]
#[ignore] // needs cluster (will create await a pod)
#[cfg(all(feature = "runtime"))]
Expand Down

0 comments on commit 4bdfe02

Please sign in to comment.