Skip to content

Commit

Permalink
Add support for CRD conversion
Browse files Browse the repository at this point in the history
Signed-off-by: Mikail Bagishov <bagishov.mikail@yandex.ru>
  • Loading branch information
MikailBag committed Aug 28, 2022
1 parent 251728e commit d8dc65c
Show file tree
Hide file tree
Showing 9 changed files with 866 additions and 38 deletions.
239 changes: 207 additions & 32 deletions examples/crd_derive_multi.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use std::convert::Infallible;

use k8s_openapi::{
apiextensions_apiserver::pkg::apis::apiextensions::v1::{
CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig,
WebhookConversion,
},
ByteString,
};
use kube::{
api::{Api, Patch, PatchParams},
core::crd::merge_crds,
core::{
conversion::{ConversionHandler, StarConverter},
crd::merge_crds,
},
runtime::wait::{await_condition, conditions},
Client, CustomResource, CustomResourceExt, ResourceExt,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tracing::*;
use warp::Filter;

mod v1 {
use super::*;
// spec that is forwards compatible with v2 (can upgrade by truncating)
#[derive(CustomResource, Serialize, Deserialize, Default, Debug, Clone, JsonSchema)]
#[kube(group = "kube.rs", version = "v1", kind = "ManyDerive", namespaced)]
pub struct ManyDeriveSpec {
Expand All @@ -20,26 +31,177 @@ mod v1 {
}
}
mod v2 {
// spec that is NOT backwards compatible with v1 (cannot retrieve oldprop if truncated)
use super::*;
#[derive(CustomResource, Serialize, Deserialize, Default, Debug, Clone, JsonSchema)]
#[kube(group = "kube.rs", version = "v2", kind = "ManyDerive", namespaced)]
pub struct ManyDeriveSpec {
pub name: String,
pub extra: Option<String>,
pub newprop: u32,
}
}

// As you can see, two CRD versions are not compatible on schema level, and default
// custom resource conversion strategy (which simply changes apiVersion)
// will lead to data loss. Therefore we will implement conversion webhook which allows us
// to seamlessly work with the objects using both api versions, allowing for safe and gradual migration.
mod conversion {
use kube::core::{NotUsed, Object};

// At first, we need to define internal, unversioned representation of our resource.
// it never leaks to the outer world, so it can evolve independently of published versions
// (as long as it is logically compatible with all versions).
#[derive(Clone)]
pub struct ManyDeriveSpec {
pub beautiful_name: String,
pub prop: u32,
}
type ManyDerive = Object<ManyDeriveSpec, NotUsed>;

pub struct RayV1;
impl kube::core::conversion::StarRay for RayV1 {
type Unversioned = ManyDerive;
type Versioned = super::v1::ManyDerive;

fn into_unversioned(&self, versioned: Self::Versioned) -> Result<Self::Unversioned, String> {
let spec = ManyDeriveSpec {
beautiful_name: versioned.spec.name,
prop: versioned.spec.oldprop,
};
Ok(Object {
metadata: versioned.metadata,
spec,
status: None,
types: None,
})
}

fn from_unversioned(&self, unversioned: Self::Unversioned) -> Result<Self::Versioned, String> {
let spec = super::v1::ManyDeriveSpec {
name: unversioned.spec.beautiful_name,
oldprop: unversioned.spec.prop,
};
Ok(super::v1::ManyDerive {
metadata: unversioned.metadata,
spec,
})
}
}

pub struct RayV2;
impl kube::core::conversion::StarRay for RayV2 {
type Unversioned = ManyDerive;
type Versioned = super::v2::ManyDerive;

fn into_unversioned(&self, versioned: Self::Versioned) -> Result<Self::Unversioned, String> {
let spec = ManyDeriveSpec {
beautiful_name: versioned.spec.name,
prop: versioned.spec.newprop,
};
Ok(Object {
metadata: versioned.metadata,
spec,
status: None,
types: None,
})
}

fn from_unversioned(&self, unversioned: Self::Unversioned) -> Result<Self::Versioned, String> {
let spec = super::v2::ManyDeriveSpec {
name: unversioned.spec.beautiful_name,
newprop: unversioned.spec.prop,
};
Ok(super::v2::ManyDerive {
metadata: unversioned.metadata,
spec,
})
}
}
}

// this function actually implements conversion service
async fn run_conversion_webhook() {
let star_converter = StarConverter::builder()
.add_ray(conversion::RayV1)
.add_ray(conversion::RayV2)
.build();
let handler = ConversionHandler::new(star_converter);

let routes = warp::path("convert")
.and(warp::body::json())
.and_then(move |req| {
let result = handler.handle(req);

async move { Ok::<_, Infallible>(warp::reply::json(&result)) }
})
.with(warp::trace::request());

// You must generate a certificate for the service / url.
// See admission_setup.sh as a starting point for how to do this, configuration of the
// conversion and admission webhooks is similar.
let addr = format!("{}:8443", std::env::var("WEBHOOK_BIND_IP").unwrap());
warp::serve(warp::post().and(routes))
.tls()
.cert_path(std::env::var("WEBHOOK_TLS_CRT").unwrap())
.key_path(std::env::var("WEBHOOK_TLS_KEY").unwrap())
.run(addr.parse::<std::net::SocketAddr>().unwrap())
.await;
}

async fn add_conversion_config(crd: &mut CustomResourceDefinition) -> anyhow::Result<()> {
// path to the CA root certificate which issued webhook serving certificate
let ca_crt = tokio::fs::read(std::env::var("CA_PATH").unwrap()).await?;
let mut client_config = WebhookClientConfig {
ca_bundle: Some(ByteString(ca_crt)),
service: None,
url: None,
};
// If you launched webhook outside the cluster (e.g. locally), set WEBHOOK_URL
// to its url (must end with '/convert', must be reachable from the apiserver).
if let Ok(url) = std::env::var("WEBHOOK_URL") {
client_config.url = Some(url);
} else {
// If you launched webhook in cluster (e.g. as a Deployment), create a
// service pointing to webhook pods, and set WEBHOOK_NAMESPACE and
// WEBHOOK_SVC to the namespace and name of the service.
// YOu may refer to crd_derive_multi.yaml for example manifests.
client_config.service = Some(ServiceReference {
namespace: std::env::var("WEBHOOK_NAMESPACE").unwrap(),
name: std::env::var("WEBHOOK_SVC").unwrap(),
port: Some(443),
path: Some("/convert".to_string()),
});
}
crd.spec.conversion = Some(CustomResourceConversion {
strategy: "Webhook".to_string(),
webhook: Some(WebhookConversion {
client_config: Some(client_config),
conversion_review_versions: vec!["v1".to_string()],
}),
});

Ok(())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();

// to run this example, you need to launch two instances in parallel:
// one with RUN_WEBHOOK=1 - it will be conversion webhook (also set WEBHOOK_BIND_IP, WEBHOOK_TLS_CRT, WEBHOOK_TLS_KEY)
// one without - it will be example itself (also set CA_PATH, WEBHOOK_NAMESPACE and WEBHOOK_SVC or WEBHOOK_URL)
if std::env::var("RUN_WEBHOOK").is_ok() {
run_conversion_webhook().await;
return Ok(());
}

let client = Client::try_default().await?;
let ssapply = PatchParams::apply("crd_derive_multi").force();

let crd1 = v1::ManyDerive::crd();
let crd2 = v2::ManyDerive::crd();
let all_crds = vec![crd1.clone(), crd2.clone()];
let mut crd1 = v1::ManyDerive::crd();
let mut crd2 = v2::ManyDerive::crd();
add_conversion_config(&mut crd1).await?;
add_conversion_config(&mut crd2).await?;
let mut all_crds = vec![crd1.clone(), crd2.clone()];

// apply schema where v1 is the stored version
apply_crd(client.clone(), merge_crds(all_crds.clone(), "v1")?).await?;
Expand All @@ -48,42 +210,55 @@ async fn main() -> anyhow::Result<()> {
let v1api: Api<v1::ManyDerive> = Api::default_namespaced(client.clone());
let v2api: Api<v2::ManyDerive> = Api::default_namespaced(client.clone());

// create a v1 version
// create a v1 resource
let v1m = v1::ManyDerive::new("old", v1::ManyDeriveSpec {
name: "i am old".into(),
oldprop: 5,
});
let oldvarv1 = v1api.patch("old", &ssapply, &Patch::Apply(&v1m)).await?;
info!("old instance on v1: {:?}", oldvarv1.spec);
let oldvarv2 = v2api.get("old").await?;
info!("old instance on v2 truncates: {:?}", oldvarv2.spec);
info!("old instance on v2 not truncated: {:?}", oldvarv2.spec);
assert_eq!(oldvarv2.spec.newprop, 5); // no data loss

// create a v2 version
let v2m = v2::ManyDerive::new("new", v2::ManyDeriveSpec {
name: "i am new".into(),
extra: Some("hi".into()),
newprop: 4,
});
let newvarv2 = v2api.patch("new", &ssapply, &Patch::Apply(&v2m)).await?;
info!("new instance on v2 is force downgraded: {:?}", newvarv2.spec); // no extra field
let cannot_fetch_as_old = v1api.get("new").await.unwrap_err();
info!("cannot fetch new on v1: {:?}", cannot_fetch_as_old);

// apply schema upgrade
apply_crd(client.clone(), merge_crds(all_crds, "v2")?).await?;
info!(
"new instance on v2 was force downgraded on storage: {:?}",
newvarv2.spec
);
assert_eq!(newvarv2.spec.newprop, 4); // no data loss
let can_fetch_as_old = v1api.get("new").await.unwrap();
info!(
"fetched new instance using old api version: {:?}",
can_fetch_as_old
);

// nothing changed with existing objects without conversion
//let oldvarv1_upg = v1api.get("old").await?;
//info!("old instance unchanged on v1: {:?}", oldvarv1_upg.spec);
//let oldvarv2_upg = v2api.get("old").await?;
//info!("old instance unchanged on v2: {:?}", oldvarv2_upg.spec);
// as you can see, proper conversion allows us to access the same object under different
// apiVersions without any problems. For example, you may have custom controller which uses v1 api
// and admission webhook which uses v2.

// re-apply new now that v2 is stored gives us the extra properties
let newvarv2_2 = v2api.patch("new", &ssapply, &Patch::Apply(&v2m)).await?;
info!("new on v2 correct on reapply to v2: {:?}", newvarv2_2.spec);
// Now imagine that we upgraded all our clients to the v2 API
// and we want to drop support for the legacy v1 API (after all you see how many code we had to write
// for conversion).

// at first, we need to promote `v2` version as stored (so that all new or updated resources are persisted
// in the cluster storage using new version).
info!("Selecting v2 as storage version");
apply_crd(client.clone(), merge_crds(all_crds.clone(), "v2")?).await?;
// here we use `migrate_resources` utility function to migrate all previously stored objects.
info!("Running storage migration");
kube::api::migrate_resources(v2api.clone(), Api::all(client.clone()), &()).await?;
// and now we can apply CRD again without specifying v1, completely removing it.
info!("Removing v1");
all_crds.remove(0);
apply_crd(client.clone(), merge_crds(all_crds, "v2")?).await?;

// note we can apply old versions without them being truncated to the v2 schema
// in our case this means we cannot fetch them with our v1 schema (breaking change to not have oldprop)
// now it is impossible to use v1
let v1m2 = v1::ManyDerive::new("old", v1::ManyDeriveSpec {
name: "i am old2".into(),
oldprop: 5,
Expand All @@ -92,16 +267,16 @@ async fn main() -> anyhow::Result<()> {
.patch("old", &ssapply, &Patch::Apply(&v1m2))
.await
.unwrap_err();
info!("cannot get old on v1 anymore: {:?}", v1err); // mandatory field oldprop truncated
// ...but the change is still there:
info!("cannot get old on v1 anymore: {:?}", v1err);
// but objects which were initally stored as v1, were migrated to v2
// and now are available through the v2 API.
let old_still_there = v2api.get("old").await?;
assert_eq!(old_still_there.spec.name, "i am old2");
assert_eq!(old_still_there.spec.name, "i am old");

cleanup(client.clone()).await?;
cleanup(client).await?;
Ok(())
}


async fn apply_crd(client: Client, crd: CustomResourceDefinition) -> anyhow::Result<()> {
let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
info!("Creating crd: {}", serde_yaml::to_string(&crd)?);
Expand Down
81 changes: 81 additions & 0 deletions examples/crd_derive_multi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
apiVersion: v1
kind: Namespace
metadata:
name: conversion-webhook-example
---
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: conversion-webhook-example
name: conversion-webhook
spec:
selector:
matchLabels:
app: webhook
replicas: 1
template:
metadata:
labels:
app: webhook
spec:
terminationGracePeriodSeconds: 1
volumes:
- name: tls
secret:
secretName: conversions-webhook-tls
containers:
- name: main
image: # Specify image name here
imagePullPolicy: Always
env:
- name: WEBHOOK_BIND_IP
value: '0.0.0.0'
- name: RUN_WEBHOOK
value: '1'
- name: WEBHOOK_TLS_CRT
value: /pki/tls.crt
- name: WEBHOOK_TLS_KEY
value: /pki/tls.key
volumeMounts:
- name: tls
mountPath: /pki
---
apiVersion: v1
kind: Service
metadata:
namespace: conversion-webhook-example
name: conversion-webhook
spec:
selector:
app: webhook
type: ClusterIP
ports:
- port: 443
targetPort: 8443
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
namespace: conversion-webhook-example
name: conversion-webhook
rules:
- apiGroups: ['apiextensions.k8s.io']
resources: ['customresourcedefinitions']
verbs: ['*']
- apiGroups: ['kube.rs']
resources: ['*']
verbs: ['*']
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
namespace: conversion-webhook-example
name: conversion-webhook
subjects:
- apiGroup: rbac.authorization.k8s.io
kind: User
name: 'system:serviceaccount:conversion-webhook-example:default'
roleRef:
kind: ClusterRole
name: conversion-webhook
apiGroup: rbac.authorization.k8s.io

0 comments on commit d8dc65c

Please sign in to comment.