diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 00beb794f9..950da9f953 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1163,7 +1163,7 @@ jobs: - 6 replication: - 1 - run-mode: [local-k8] + run-mode: [local, local-k8] steps: - name: Checkout Source Code uses: actions/checkout@v4 @@ -1182,11 +1182,13 @@ jobs: path: ~/.fluvio/extensions - name: Setup K8s Cluster + if: matrix.run-mode == 'local-k8' run: | curl -s https://raw.githubusercontent.com/rancher/k3d/main/install.sh | TAG=${{ env.K3D_VERSION }} bash ./k8-util/cluster/reset-k3d.sh - name: Wait 15s for K3D Reset + if: matrix.run-mode == 'local-k8' run: sleep 15 - name: Set up Fluvio Binaries diff --git a/Cargo.lock b/Cargo.lock index c7cbcee728..77127a2c21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3243,7 +3243,7 @@ dependencies = [ [[package]] name = "fluvio-stream-dispatcher" -version = "0.12.0" +version = "0.13.0" dependencies = [ "anyhow", "async-channel", @@ -3267,7 +3267,7 @@ dependencies = [ [[package]] name = "fluvio-stream-model" -version = "0.10.0" +version = "0.11.0" dependencies = [ "async-rwlock", "async-std", diff --git a/Cargo.toml b/Cargo.toml index ddbfc520ad..ad880ec6bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -180,8 +180,8 @@ fluvio-smartmodule = { version = "0.7.0", path = "crates/fluvio-smartmodule", de fluvio-socket = { version = "0.14.3", path = "crates/fluvio-socket", default-features = false } fluvio-spu-schema = { version = "0.14.5", path = "crates/fluvio-spu-schema", default-features = false } fluvio-storage = { path = "crates/fluvio-storage" } -fluvio-stream-dispatcher = { version = "0.12.0", path = "crates/fluvio-stream-dispatcher" } -fluvio-stream-model = { version = "0.10.0", path = "crates/fluvio-stream-model", default-features = false } +fluvio-stream-dispatcher = { version = "0.13.0", path = "crates/fluvio-stream-dispatcher" } +fluvio-stream-model = { version = "0.11.0", path = "crates/fluvio-stream-model", default-features = false } fluvio-types = { version = "0.4.4", path = "crates/fluvio-types", default-features = false } # Used to make eyre faster on debug builds diff --git a/crates/fluvio-sc/src/k8/objects/spu_k8_config.rs b/crates/fluvio-sc/src/k8/objects/spu_k8_config.rs index 3d387e8cb1..46cf1dcdd2 100644 --- a/crates/fluvio-sc/src/k8/objects/spu_k8_config.rs +++ b/crates/fluvio-sc/src/k8/objects/spu_k8_config.rs @@ -214,7 +214,7 @@ mod extended { match ScK8Config::from(k8_obj.header.data) { Ok(config) => match k8_obj.metadata.try_into() { Ok(ctx_item) => { - let ctx = MetadataContext::new(ctx_item, None); + let ctx = MetadataContext::new(ctx_item); Ok( MetadataStoreObject::new("fluvio", config, FluvioConfigStatus {}) .with_context(ctx), diff --git a/crates/fluvio-sc/src/services/private_api/private_server.rs b/crates/fluvio-sc/src/services/private_api/private_server.rs index f92d35e68f..7d568cf95c 100644 --- a/crates/fluvio-sc/src/services/private_api/private_server.rs +++ b/crates/fluvio-sc/src/services/private_api/private_server.rs @@ -19,6 +19,7 @@ use fluvio_controlplane::spu_api::update_spu::UpdateSpuRequest; use fluvio_controlplane_metadata::message::Message; use fluvio_stream_model::core::MetadataItem; use fluvio_stream_model::store::ChangeListener; +use tracing::warn; use tracing::{debug, info, trace, instrument, error}; use async_trait::async_trait; use futures_util::stream::Stream; @@ -267,7 +268,7 @@ where None } } else { - error!("replica doesn't exist"); + warn!("replica doesn't exist"); None }; diff --git a/crates/fluvio-stream-dispatcher/Cargo.toml b/crates/fluvio-stream-dispatcher/Cargo.toml index 1420706026..71e7ca0725 100644 --- a/crates/fluvio-stream-dispatcher/Cargo.toml +++ b/crates/fluvio-stream-dispatcher/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fluvio-stream-dispatcher" edition = "2021" -version = "0.12.0" +version = "0.13.0" authors = ["Fluvio Contributors "] description = "Fluvio Event Stream access" repository = "https://github.com/infinyon/fluvio" diff --git a/crates/fluvio-stream-dispatcher/src/metadata/k8.rs b/crates/fluvio-stream-dispatcher/src/metadata/k8.rs index aac79aa495..0c47418b07 100644 --- a/crates/fluvio-stream-dispatcher/src/metadata/k8.rs +++ b/crates/fluvio-stream-dispatcher/src/metadata/k8.rs @@ -118,7 +118,7 @@ impl MetadataClient for T { let (key, spec, _status, ctx) = value.parts(); let k8_spec: S::K8Spec = spec.into_k8(); - let mut input_metadata = if let Some(parent_metadata) = ctx.owner() { + let mut input_metadata = if let Some(parent_metadata) = ctx.item().owner() { debug!("owner exists"); let item_name = key.to_string(); @@ -458,10 +458,11 @@ mod tests { let namespace = NameSpace::Named("ns1".to_string()); let key = "child".to_string(); let parent_key = "parent".to_string(); - let meta = K8MetaItem::new(key.clone(), namespace.to_string()); + let mut meta = K8MetaItem::new(key.clone(), namespace.to_string()); let parent_meta = K8MetaItem::new(parent_key.clone(), namespace.to_string()); + meta.set_owner(parent_meta); - let ctx = fluvio_stream_model::store::k8::K8MetadataContext::new(meta, Some(parent_meta)); + let ctx = fluvio_stream_model::store::k8::K8MetadataContext::new(meta); let obj = MetadataStoreObject::new_with_context(key.clone(), TestSpec::default(), ctx); diff --git a/crates/fluvio-stream-dispatcher/src/metadata/local.rs b/crates/fluvio-stream-dispatcher/src/metadata/local.rs index 35ccb5c1ce..5087201ca8 100644 --- a/crates/fluvio-stream-dispatcher/src/metadata/local.rs +++ b/crates/fluvio-stream-dispatcher/src/metadata/local.rs @@ -1,17 +1,17 @@ use std::{ path::{Path, PathBuf}, - collections::HashMap, + collections::{HashMap, hash_map::Entry}, sync::{Arc, atomic::AtomicU64}, any::Any, ffi::OsStr, }; -use anyhow::{Result, anyhow}; +use anyhow::{Result, anyhow, Context}; use async_channel::{Sender, Receiver, bounded}; use async_lock::{RwLock, RwLockUpgradableReadGuard}; use futures_util::{stream::BoxStream, StreamExt}; use serde::{Serialize, Deserialize, de::DeserializeOwned}; -use tracing::{warn, debug}; +use tracing::{warn, debug, trace}; use fluvio_stream_model::{ core::{MetadataItem, Spec, MetadataContext}, @@ -34,22 +34,14 @@ pub struct LocalMetadataStorage { pub struct LocalMetadataItem { id: String, revision: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + parent: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + children: Option>>, } pub type LocalStoreObject = MetadataStoreObject; -impl MetadataItem for LocalMetadataItem { - type UId = String; - - fn uid(&self) -> &Self::UId { - &self.id - } - - fn is_newer(&self, another: &Self) -> bool { - self.revision > another.revision - } -} - #[async_trait::async_trait] impl MetadataClient for LocalMetadataStorage { async fn retrieve_items( @@ -67,8 +59,16 @@ impl MetadataClient for LocalMetadataStorage { where S: K8ExtendedSpec, { + trace!(?metadata, "delete item"); let store = self.get_store::().await?; - store.delete_item(metadata).await + if let Some(item) = store.try_retrieve_item::(&metadata).await? { + if let Some(owner) = item.ctx().item().owner() { + self.unlink_parent::(owner, item.ctx().item()).await?; + } + self.delete_children(item).await?; + store.delete_item(&metadata).await + }; + Ok(()) } async fn finalize_delete_item(&self, metadata: LocalMetadataItem) -> Result<()> @@ -83,8 +83,12 @@ impl MetadataClient for LocalMetadataStorage { S: K8ExtendedSpec, ::Owner: K8ExtendedSpec, { + trace!(?value, "apply"); let store = self.get_store::().await?; value.ctx_mut().item_mut().id = value.key().to_string(); + if let Some(owner) = value.ctx().item().owner() { + self.link_parent::(owner, value.ctx().item()).await?; + } store.apply(value).await } @@ -94,6 +98,7 @@ impl MetadataClient for LocalMetadataStorage { { use std::str::FromStr; + trace!(?metadata, ?spec, "update spec"); let store = self.get_store::().await?; let item = match store.try_retrieve_item::(&metadata).await? { Some(mut item) => { @@ -106,7 +111,7 @@ impl MetadataClient for LocalMetadataStorage { anyhow!("failed to parse key from a string: {}", metadata.uid()) })?, spec, - MetadataContext::new(metadata, None), + MetadataContext::new(metadata), ), }; store.apply(item).await?; @@ -122,9 +127,10 @@ impl MetadataClient for LocalMetadataStorage { where S: K8ExtendedSpec, { + trace!(?key, ?spec, "update spec by key"); let metadata = LocalMetadataItem { id: key.to_string(), - revision: Default::default(), + ..Default::default() }; let store = self.get_store::().await?; let item = match store.try_retrieve_item::(&metadata).await? { @@ -132,9 +138,7 @@ impl MetadataClient for LocalMetadataStorage { item.set_spec(spec); item } - None => { - LocalStoreObject::new_with_context(key, spec, MetadataContext::new(metadata, None)) - } + None => LocalStoreObject::new_with_context(key, spec, MetadataContext::new(metadata)), }; store.apply(item).await } @@ -148,6 +152,7 @@ impl MetadataClient for LocalMetadataStorage { where S: K8ExtendedSpec, { + trace!(?metadata, ?status, "update status"); let store = self.get_store::().await?; let mut item = store.retrieve_item::(&metadata).await?; item.ctx_mut().set_item(metadata.clone()); @@ -173,6 +178,83 @@ impl MetadataClient for LocalMetadataStorage { } } +impl MetadataItem for LocalMetadataItem { + type UId = String; + + fn uid(&self) -> &Self::UId { + &self.id + } + + fn is_newer(&self, another: &Self) -> bool { + self.revision > another.revision + } + + fn owner(&self) -> Option<&Self> { + self.parent.as_ref().map(|p| p.as_ref()) + } + + fn set_owner(&mut self, owner: Self) { + self.parent = Some(Box::new(owner)); + } + + fn children(&self) -> Option<&HashMap>> { + self.children.as_ref() + } + + fn set_children(&mut self, children: HashMap>) { + self.children = Some(children); + } +} + +impl LocalMetadataItem { + pub fn new>(id: S) -> Self { + Self { + id: id.into(), + ..Default::default() + } + } + + pub fn with_parent>(id: S, parent: LocalMetadataItem) -> Self { + Self { + id: id.into(), + parent: Some(Box::new(parent)), + ..Default::default() + } + } + + pub fn with_revision(mut self, revision: u64) -> Self { + self.revision = revision; + self + } + + pub fn put_child>(&mut self, kind: S, child: LocalMetadataItem) { + let children = self.children.get_or_insert(Default::default()); + match children.entry(kind.into()) { + Entry::Occupied(mut entry) => { + let vec = entry.get_mut(); + if !vec.contains(&child) { + vec.push(child); + } + } + Entry::Vacant(entry) => { + entry.insert(vec![child]); + } + } + } + pub fn remove_child>(&mut self, kind: S, child: &LocalMetadataItem) { + let children = self.children.get_or_insert(Default::default()); + match children.entry(kind.into()) { + Entry::Occupied(mut entry) => { + entry.get_mut().retain(|i| !i.eq(child)); + if entry.get().is_empty() { + entry.remove_entry(); + } + } + Entry::Vacant(_) => {} + } + } +} + #[derive(Debug)] struct SpecStore { version: AtomicU64, @@ -214,6 +296,64 @@ impl LocalMetadataStorage { } }) } + + async fn delete_children(&self, item: LocalStoreObject) -> Result<()> { + if let Some(all) = item.ctx().item().children() { + for (kind, children) in all { + let child_store = self.get_store_by_key(kind).await?; + for child in children { + trace!(?item, ?child, "delete child"); + child_store.delete_item(child).await; + } + } + } + Ok(()) + } + + async fn link_parent( + &self, + parent: &LocalMetadataItem, + child: &LocalMetadataItem, + ) -> Result<()> { + trace!(?parent, ?child, "link parent"); + let parent_store = self.get_store::().await?; + parent_store + .mut_in_place::(parent.uid(), |parent_obj| { + parent_obj + .ctx_mut() + .item_mut() + .put_child(S::LABEL, child.clone()); + }) + .await?; + Ok(()) + } + + async fn unlink_parent( + &self, + parent: &LocalMetadataItem, + child: &LocalMetadataItem, + ) -> Result<()> { + trace!(?parent, ?child, "link parent"); + let parent_store = self.get_store::().await?; + parent_store + .mut_in_place::(parent.uid(), |parent_obj| { + parent_obj + .ctx_mut() + .item_mut() + .remove_child(S::LABEL, child); + }) + .await?; + Ok(()) + } + + async fn get_store_by_key(&self, key: &str) -> Result> { + self.stores + .read() + .await + .get(key) + .cloned() + .ok_or_else(|| anyhow!("store not found for key {key}")) + } } impl SpecStore { @@ -229,15 +369,13 @@ impl SpecStore { if !path.extension().eq(&Some(OsStr::new("yaml"))) { continue; } - match SpecPointer::load::(path) { - Ok((name, item)) => { - debug!(kind = S::LABEL, name, "loaded"); - data.insert(name, item); - } - Err(err) => { - warn!("skipped spec file: {err}"); - } - }; + let (name, item) = SpecPointer::load::(&path).context(format!( + "loading metadata '{}' from {}", + S::LABEL, + path.display() + ))?; + debug!(kind = S::LABEL, name, "loaded"); + data.insert(name, item); } let (sender, receiver) = bounded(MAX_UPDATES_CAPACITY); @@ -287,10 +425,10 @@ impl SpecStore { { self.try_retrieve_item::(metadata) .await? - .ok_or_else(|| anyhow!("{} not found", metadata.uid())) + .ok_or_else(|| anyhow!("'{}' not found", metadata.uid())) } - async fn delete_item(&self, metadata: LocalMetadataItem) -> Result<()> { + async fn delete_item(&self, metadata: &LocalMetadataItem) { let mut write = self.data.write().await; if let Some(removed) = write.remove(metadata.uid()) { removed.delete(); @@ -301,7 +439,6 @@ impl SpecStore { self.version .fetch_add(1, std::sync::atomic::Ordering::SeqCst); } - Ok(()) } async fn apply(&self, mut value: LocalStoreObject) -> Result<()> @@ -362,6 +499,21 @@ impl SpecStore { fn spec_file_name(&self, name: &str) -> PathBuf { self.path.join(format!("{name}.yaml")) } + + async fn mut_in_place(&self, key: &str, func: F) -> Result<()> + where + F: Fn(&mut LocalStoreObject), + { + if let Some(spec) = self.data.write().await.get_mut(key) { + let mut obj = spec.downcast::()?; + func(&mut obj); + spec.set(obj); + spec.flush::()?; + Ok(()) + } else { + anyhow::bail!("'{key}' not found"); + } + } } impl SpecPointer { @@ -387,13 +539,7 @@ impl SpecPointer { fn downcast_ref(&self) -> Result<&LocalStoreObject> { self.inner .downcast_ref::>() - .ok_or_else(|| { - anyhow::anyhow!( - "incompatible type {:?} for spec kind {}", - self.inner.type_id(), - S::LABEL, - ) - }) + .ok_or_else(|| anyhow::anyhow!("incompatible type for spec kind {}", S::LABEL,)) } fn downcast(&self) -> Result> { @@ -411,6 +557,11 @@ impl SpecPointer { serde_yaml::to_writer(std::fs::File::create(&self.path)?, &storage)?; Ok(()) } + + fn set(&mut self, obj: LocalStoreObject) { + self.revision = obj.ctx().item().revision; + self.inner = Arc::new(obj); + } } impl SpecUpdate { @@ -430,7 +581,6 @@ impl SpecUpdate { } #[derive(Debug, Serialize, Deserialize, PartialEq)] -#[serde(tag = "api-version")] #[serde(bound(deserialize = "S: DeserializeOwned"))] enum VersionedSpecStorage where @@ -439,6 +589,7 @@ where #[serde(rename = "1.0.0")] V1(SpecStorageV1), } + impl VersionedSpecStorage { fn meta(&self) -> &LocalMetadataItem { match self { @@ -453,10 +604,7 @@ struct SpecStorageV1 where S: Spec, { - #[serde(flatten)] meta: LocalMetadataItem, - #[serde(skip_serializing_if = "Option::is_none")] - parent: Option, key: String, status: S::Status, spec: S, @@ -473,11 +621,10 @@ impl TryFrom<&SpecPointer> for VersionedSpecStorage { ctx, } = value.downcast::()?; - let (meta, parent) = ctx.into_parts(); + let meta = ctx.into_inner(); Ok(Self::V1(SpecStorageV1 { meta, - parent, key: key.to_string(), status, spec, @@ -498,12 +645,11 @@ where VersionedSpecStorage::V1(storage) => { let SpecStorageV1 { meta, - parent, key, status, spec, } = storage; - let ctx = MetadataContext::new(meta, parent); + let ctx = MetadataContext::new(meta); let key: S::IndexKey = key .parse() .map_err(|_| anyhow!("failed to parse key from '{key}'"))?; @@ -522,7 +668,10 @@ mod tests { ops::{AddAssign, SubAssign}, }; - use crate::metadata::fixture::{TestSpec, TestStatus}; + use crate::metadata::fixture::{ + TestSpec, TestStatus, + parent::{ParentSpec, ParentStatus}, + }; use super::*; @@ -545,16 +694,16 @@ mod tests { assert_eq!( spec_file_content, - r#"api-version: 1.0.0 -id: meta -revision: 0 -parent: - id: parent + r#"!1.0.0 +meta: + id: meta revision: 0 key: meta status: '' spec: replica: 1 + replica_spec: !Computed + count: 1 "# ); @@ -661,7 +810,7 @@ spec: //then assert!(res.is_err()); - assert_eq!(res.unwrap_err().to_string(), "meta not found"); + assert_eq!(res.unwrap_err().to_string(), "'meta' not found"); drop(meta_folder) } @@ -672,7 +821,10 @@ spec: let meta_folder = tempfile::tempdir().expect("temp dir created"); let meta_store = LocalMetadataStorage::new(&meta_folder); let obj = default_test_store_obj(); - let spec = TestSpec { replica: 5 }; + let spec = TestSpec { + replica: 5, + ..Default::default() + }; //when meta_store.apply(obj.clone()).await.expect("applied"); @@ -701,7 +853,10 @@ spec: let meta_folder = tempfile::tempdir().expect("temp dir created"); let meta_store = LocalMetadataStorage::new(&meta_folder); let obj = default_test_store_obj(); - let spec = TestSpec { replica: 5 }; + let spec = TestSpec { + replica: 5, + ..Default::default() + }; //when meta_store @@ -729,7 +884,10 @@ spec: let meta_folder = tempfile::tempdir().expect("temp dir created"); let meta_store = LocalMetadataStorage::new(&meta_folder); let obj = default_test_store_obj(); - let spec = TestSpec { replica: 6 }; + let spec = TestSpec { + replica: 6, + ..Default::default() + }; //when meta_store.apply(obj.clone()).await.expect("applied"); @@ -758,7 +916,10 @@ spec: let meta_folder = tempfile::tempdir().expect("temp dir created"); let meta_store = LocalMetadataStorage::new(&meta_folder); let obj = default_test_store_obj(); - let spec = TestSpec { replica: 6 }; + let spec = TestSpec { + replica: 6, + ..Default::default() + }; //when meta_store @@ -819,6 +980,7 @@ spec: matches!(updates.get(1), Some(LSUpdate::Mod(obj)) if obj.status.to_string().eq("new status")) ); assert!(matches!(updates.get(2), Some(LSUpdate::Delete(deleted)) if deleted.eq(&obj.key))); + drop(meta_folder) } #[fluvio_future::test] @@ -840,6 +1002,7 @@ spec: res.unwrap_err().to_string(), "attempt to update by stale value: current version: 1, proposed: 0" ); + drop(meta_folder) } #[fluvio_future::test] @@ -873,6 +1036,7 @@ spec: res.unwrap_err().to_string(), "attempt to update by stale value: current version: 1, proposed: 0" ); + drop(meta_folder) } #[fluvio_future::test] @@ -881,7 +1045,10 @@ spec: let meta_folder = tempfile::tempdir().expect("temp dir created"); let meta_store = LocalMetadataStorage::new(&meta_folder); let obj = default_test_store_obj(); - let spec = TestSpec { replica: 5 }; + let spec = TestSpec { + replica: 5, + ..Default::default() + }; meta_store.apply(obj.clone()).await.expect("applied"); meta_store @@ -900,6 +1067,7 @@ spec: res.unwrap_err().to_string(), "attempt to update by stale value: current version: 1, proposed: 0" ); + drop(meta_folder) } #[fluvio_future::test] @@ -908,7 +1076,10 @@ spec: let meta_folder = tempfile::tempdir().expect("temp dir created"); let meta_store = LocalMetadataStorage::new(&meta_folder); let obj = default_test_store_obj(); - let spec = TestSpec { replica: 5 }; + let spec = TestSpec { + replica: 5, + ..Default::default() + }; meta_store.apply(obj.clone()).await.expect("applied"); meta_store @@ -923,6 +1094,7 @@ spec: //then assert!(res.is_ok()); + drop(meta_folder) } #[fluvio_future::test] @@ -972,6 +1144,7 @@ spec: matches!(updates.get(0), Some(LSUpdate::Mod(obj)) if obj.status.to_string().eq("new status2")) ); assert!(matches!(updates.get(1), Some(LSUpdate::Delete(deleted)) if deleted.eq(&obj.key))); + drop(meta_folder) } #[fluvio_future::test] @@ -1022,27 +1195,140 @@ spec: let updates2: Vec<_> = updates2.into_iter().flatten().flatten().collect(); assert_eq!(updates2.len(), 0); + drop(meta_folder) + } + + #[fluvio_future::test] + async fn test_cascade_children_deletion() { + //given + let meta_folder = tempfile::tempdir().expect("temp dir created"); + let meta_store = LocalMetadataStorage::new(&meta_folder); + let (parent, children) = test_parent_with_children(2); + meta_store + .apply(parent.clone()) + .await + .expect("applied parent"); + for child in children { + meta_store.apply(child).await.expect("applied child"); + } + //when + let before = meta_store + .retrieve_items::(&NameSpace::All) + .await + .expect("items"); + + meta_store + .delete_item::(parent.ctx().item().clone()) + .await + .expect("deleted parent"); + + let after = meta_store + .retrieve_items::(&NameSpace::All) + .await + .expect("items"); + + //then + assert_eq!(before.items.len(), 2); + assert!(after.items.is_empty()); + drop(meta_folder) + } + + #[fluvio_future::test] + async fn test_parent_linking() { + //given + let meta_folder = tempfile::tempdir().expect("temp dir created"); + let meta_store = LocalMetadataStorage::new(&meta_folder); + let (mut parent, mut children) = test_parent_with_children(1); + let child = children.remove(0); + parent.ctx_mut().item_mut().set_children(Default::default()); + meta_store + .apply(parent.clone()) + .await + .expect("applied parent"); + + //when + meta_store + .apply(child.clone()) + .await + .expect("applied child"); + + let parent_meta = meta_store + .retrieve_items::(&NameSpace::All) + .await + .expect("items") + .items + .remove(0) + .ctx_owned() + .into_inner(); + + assert_eq!(parent_meta.children().unwrap().len(), 1); + assert_eq!( + parent_meta + .children() + .unwrap() + .get(TestSpec::LABEL) + .expect("test spec children") + .len(), + 1 + ); + + assert!(parent_meta + .children() + .unwrap() + .get(TestSpec::LABEL) + .expect("test spec children") + .contains(child.ctx().item()),); + + meta_store + .delete_item::(child.ctx().item().clone()) + .await + .expect("deleted child"); + + //then + let parent_meta = meta_store + .retrieve_items::(&NameSpace::All) + .await + .expect("items") + .items + .remove(0) + .ctx_owned() + .into_inner(); + + assert!(parent_meta.children().unwrap().is_empty()); + drop(meta_folder) + } + + #[fluvio_future::test] + async fn test_parent_is_not_existed() { + //given + let meta_folder = tempfile::tempdir().expect("temp dir created"); + let meta_store = LocalMetadataStorage::new(&meta_folder); + let (_, mut children) = test_parent_with_children(1); + let child = children.remove(0); + + //when + let res = meta_store.apply(child).await; + + //then + assert!(res.is_err()); + assert_eq!(res.unwrap_err().to_string(), "'parent' not found"); + drop(meta_folder) } #[test] fn test_ser() { //given - let spec = TestSpec { replica: 1 }; - let meta = LocalMetadataItem { - id: "meta1".to_string(), - revision: 1, + let spec = TestSpec { + replica: 1, + ..Default::default() }; - let parent = Some(LocalMetadataItem { - id: "parent1".to_string(), - revision: 2, - }); + let meta = meta_with_parent(); let spec_storage = SpecStorageV1 { key: "key1".to_string(), status: TestStatus("status1".to_string()), spec, meta, - parent, }; //when @@ -1052,16 +1338,19 @@ spec: //then assert_eq!( str, - r#"api-version: 1.0.0 -id: meta1 -revision: 1 -parent: - id: parent1 - revision: 2 + r#"!1.0.0 +meta: + id: meta1 + revision: 1 + parent: + id: parent1 + revision: 2 key: key1 status: status1 spec: replica: 1 + replica_spec: !Computed + count: 1 "# ); } @@ -1069,16 +1358,19 @@ spec: #[test] fn test_deser() { //given - let input = r#"api-version: 1.0.0 -id: meta -revision: 2 -parent: - id: parent1 - revision: 3 + let input = r#"!1.0.0 +meta: + id: meta + revision: 2 + parent: + id: parent1 + revision: 0 key: key1 status: status3 spec: replica: 2 + replica_spec: !Computed + count: 1 "#; //when let parsed: VersionedSpecStorage = @@ -1088,21 +1380,119 @@ spec: assert_eq!( parsed, VersionedSpecStorage::V1(SpecStorageV1 { - meta: LocalMetadataItem { - id: "meta".to_string(), - revision: 2, - }, - parent: Some(LocalMetadataItem { - id: "parent1".to_string(), - revision: 3, - }), + meta: LocalMetadataItem::with_parent("meta", LocalMetadataItem::new("parent1")) + .with_revision(2), key: "key1".to_string(), status: TestStatus("status3".to_string()), - spec: TestSpec { replica: 2 } + spec: TestSpec { + replica: 2, + ..Default::default() + }, }) ) } + #[test] + fn test_serde_parent() { + //given + let spec = ParentSpec { replica: 1 }; + let mut meta = LocalMetadataItem::new("parent").with_revision(1); + let child1 = LocalMetadataItem::with_parent("child1", meta.clone()).with_revision(1); + let child2 = LocalMetadataItem::with_parent("child2", meta.clone()).with_revision(2); + + let children = [(TestSpec::LABEL.to_owned(), vec![child1, child2])].into(); + meta.set_children(children); + + let spec_storage = VersionedSpecStorage::V1(SpecStorageV1 { + key: "key1".to_string(), + status: ParentStatus("status1".to_string()), + spec, + meta, + }); + + //when + let str = serde_yaml::to_string(&spec_storage).expect("serialized"); + + //then + assert_eq!( + str, + r#"!1.0.0 +meta: + id: parent + revision: 1 + children: + TEST_SPEC: + - id: child1 + revision: 1 + parent: + id: parent + revision: 1 + - id: child2 + revision: 2 + parent: + id: parent + revision: 1 +key: key1 +status: status1 +spec: + replica: 1 +"# + ); + + let deser: VersionedSpecStorage = + serde_yaml::from_str(&str).expect("deserialized"); + assert_eq!(spec_storage, deser); + } + + #[test] + fn test_metadata_put_child() { + //given + let mut meta = LocalMetadataItem::new("parent1"); + + //when + meta.put_child("kind1", LocalMetadataItem::new("child1")); + meta.put_child("kind1", LocalMetadataItem::new("child1")); + meta.put_child("kind1", LocalMetadataItem::new("child2")); + meta.put_child("kind2", LocalMetadataItem::new("child1")); + + //then + assert!(meta.children().is_some()); + let mut chidlren = meta.children.take().unwrap(); + assert_eq!(chidlren.len(), 2); + + let kind1 = chidlren.remove("kind1").unwrap(); + assert_eq!(kind1.len(), 2); + assert!(kind1.contains(&LocalMetadataItem::new("child1"))); + assert!(kind1.contains(&LocalMetadataItem::new("child2"))); + + let kind2 = chidlren.remove("kind2").unwrap(); + assert_eq!(kind2.len(), 1); + assert!(kind2.contains(&LocalMetadataItem::new("child1"))); + } + + #[test] + fn test_metadata_remove_child() { + //given + let mut meta = LocalMetadataItem::new("parent1"); + meta.put_child("kind1", LocalMetadataItem::new("child1")); + meta.put_child("kind1", LocalMetadataItem::new("child2")); + meta.put_child("kind2", LocalMetadataItem::new("child1")); + + //when + meta.remove_child("kind1", &LocalMetadataItem::new("child1")); + meta.remove_child("kind2", &LocalMetadataItem::new("child1")); + meta.remove_child("kind3", &LocalMetadataItem::new("child1")); + + //then + assert!(meta.children().is_some()); + let mut chidlren = meta.children.take().unwrap(); + assert_eq!(chidlren.len(), 1); + + let kind1 = chidlren.remove("kind1").unwrap(); + assert_eq!(kind1.len(), 1); + assert!(kind1.contains(&LocalMetadataItem::new("child2"))); + } + fn default_test_store_obj() -> LocalStoreObject { test_store_obj("meta") } @@ -1111,16 +1501,55 @@ spec: let meta = LocalMetadataItem { id: key.to_string(), revision: 0, + ..Default::default() }; - let parent = Some(LocalMetadataItem { + let spec = TestSpec { + replica: 1, + ..Default::default() + }; + LocalStoreObject::new_with_context(meta.uid().to_string(), spec, MetadataContext::new(meta)) + } + + fn meta_with_parent() -> LocalMetadataItem { + let parent = LocalMetadataItem::new("parent1").with_revision(2); + LocalMetadataItem::with_parent("meta1", parent).with_revision(1) + } + + fn test_parent_with_children( + children_count: usize, + ) -> ( + LocalStoreObject, + Vec>, + ) { + let mut parent_meta = LocalMetadataItem { id: "parent".to_string(), revision: 0, - }); - let spec = TestSpec { replica: 1 }; - LocalStoreObject::new_with_context( - meta.uid().to_string(), - spec, - MetadataContext::new(meta, parent), - ) + ..Default::default() + }; + let children_meta: Vec = (0..children_count) + .map(|i| LocalMetadataItem { + id: format!("child{i}"), + revision: 1, + ..Default::default() + }) + .collect(); + let parent_spec = ParentSpec { replica: 1 }; + parent_meta.set_children([(TestSpec::LABEL.to_owned(), children_meta.clone())].into()); + let parent_ctx = MetadataContext::new(parent_meta.clone()); + let parent_obj = + LocalStoreObject::new_with_context(parent_meta.uid().clone(), parent_spec, parent_ctx); + + let children_objs = children_meta + .into_iter() + .map(|mut meta| { + meta.set_owner(parent_meta.clone()); + LocalStoreObject::new_with_context( + meta.uid().to_string(), + TestSpec::default(), + MetadataContext::new(meta), + ) + }) + .collect(); + (parent_obj, children_objs) } } diff --git a/crates/fluvio-stream-dispatcher/src/metadata/mod.rs b/crates/fluvio-stream-dispatcher/src/metadata/mod.rs index ece1487b33..6a3e6c693a 100644 --- a/crates/fluvio-stream-dispatcher/src/metadata/mod.rs +++ b/crates/fluvio-stream-dispatcher/src/metadata/mod.rs @@ -79,9 +79,24 @@ mod fixture { }; use serde::{Serialize, Deserialize}; + use self::parent::ParentSpec; + #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] pub(crate) struct TestSpec { pub replica: usize, + pub replica_spec: TestReplicaSpec, + } + + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] + pub(crate) enum TestReplicaSpec { + Computed { count: usize }, + Assigned { limit: usize }, + } + + impl Default for TestReplicaSpec { + fn default() -> Self { + Self::Computed { count: 1 } + } } #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -90,7 +105,7 @@ mod fixture { impl Spec for TestSpec { const LABEL: &'static str = "TEST_SPEC"; type Status = TestStatus; - type Owner = Self; + type Owner = ParentSpec; type IndexKey = String; } @@ -163,6 +178,7 @@ mod fixture { fn from(value: TestK8Spec) -> Self { Self { replica: value.replica, + ..Default::default() } } } @@ -172,4 +188,104 @@ mod fixture { Self(value.0) } } + + pub mod parent { + use super::*; + + #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] + pub(crate) struct ParentSpec { + pub replica: usize, + } + + #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] + pub(crate) struct ParentStatus(pub String); + + impl Spec for ParentSpec { + const LABEL: &'static str = "PARENT_SPEC"; + type Status = ParentStatus; + type Owner = Self; + type IndexKey = String; + } + + impl Status for ParentStatus {} + + impl Display for ParentStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } + } + + impl K8ExtendedSpec for ParentSpec { + type K8Spec = ParentK8Spec; + + const FINALIZER: Option<&'static str> = Some("FINALIZER2"); + + fn convert_from_k8( + k8_obj: fluvio_stream_model::k8_types::K8Obj, + multi_namespace_context: bool, + ) -> std::result::Result< + MetadataStoreObject, + K8ConvertError, + > { + default_convert_from_k8(k8_obj, multi_namespace_context) + } + + fn convert_status_from_k8(status: Self::Status) -> ::Status { + ParentK8SpecStatus(status.0) + } + + fn into_k8(self) -> Self::K8Spec { + ParentK8Spec { + replica: self.replica, + } + } + } + + #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] + pub(crate) struct ParentK8Spec { + pub replica: usize, + } + + #[derive(Default, Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] + pub(crate) struct ParentK8SpecStatus(String); + + impl K8Status for ParentK8SpecStatus {} + + impl Display for ParentK8SpecStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } + } + + impl K8Spec for ParentK8Spec { + type Status = ParentK8SpecStatus; + type Header = DefaultHeader; + + fn metadata() -> &'static Crd { + &Crd { + group: "parent_test.fluvio", + version: "v1", + names: CrdNames { + kind: "parent_spec", + plural: "parent_specs", + singular: "parent_spec", + }, + } + } + } + + impl From for ParentSpec { + fn from(value: ParentK8Spec) -> Self { + Self { + replica: value.replica, + } + } + } + + impl From for ParentStatus { + fn from(value: ParentK8SpecStatus) -> Self { + Self(value.0) + } + } + } } diff --git a/crates/fluvio-stream-model/Cargo.toml b/crates/fluvio-stream-model/Cargo.toml index 30accf3ba7..993790d4ab 100644 --- a/crates/fluvio-stream-model/Cargo.toml +++ b/crates/fluvio-stream-model/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fluvio-stream-model" edition = "2021" -version = "0.10.0" +version = "0.11.0" authors = ["Fluvio Contributors "] description = "Fluvio Event Stream Model" repository = "https://github.com/infinyon/fluvio" diff --git a/crates/fluvio-stream-model/src/core.rs b/crates/fluvio-stream-model/src/core.rs index fdb47f4fe7..fddcac5386 100644 --- a/crates/fluvio-stream-model/src/core.rs +++ b/crates/fluvio-stream-model/src/core.rs @@ -33,6 +33,18 @@ mod context { fn get_labels(&self) -> HashMap { HashMap::new() } + + fn owner(&self) -> Option<&Self> { + Default::default() + } + + fn set_owner(&mut self, _owner: Self) {} + + fn children(&self) -> Option<&HashMap>> { + Default::default() + } + + fn set_children(&mut self, _children: HashMap>) {} } pub trait MetadataRevExtension: MetadataItem { @@ -67,18 +79,17 @@ mod context { #[derive(Default, Debug, Clone, Eq, PartialEq)] pub struct MetadataContext { item: C, - owner: Option, } impl From for MetadataContext { fn from(item: C) -> Self { - Self { item, owner: None } + Self { item } } } impl MetadataContext { - pub fn new(item: C, owner: Option) -> Self { - Self { item, owner } + pub fn new(item: C) -> Self { + Self::from(item) } pub fn item(&self) -> &C { @@ -97,15 +108,8 @@ mod context { self.item } - pub fn into_parts(self) -> (C, Option) { - (self.item, self.owner) - } - - pub fn owner(&self) -> Option<&C> { - self.owner.as_ref() - } - pub fn set_owner(&mut self, ctx: C) { - self.owner = Some(ctx); + pub fn into_inner(self) -> C { + self.item } } @@ -114,17 +118,15 @@ mod context { C: MetadataItem, { pub fn create_child(&self) -> Self { - Self { - item: C::default(), - owner: Some(self.item.clone()), - } + let mut item = C::default(); + item.set_owner(self.item().clone()); + Self { item } } - pub fn set_labels>(self, labels: Vec<(T, T)>) -> Self { - Self { - item: self.item.set_labels(labels), - owner: self.owner, - } + pub fn set_labels>(mut self, labels: Vec<(T, T)>) -> Self { + let item = self.item.set_labels(labels); + self.item = item; + self } } @@ -133,7 +135,7 @@ mod context { C: MetadataRevExtension, { pub fn next_rev(&self) -> Self { - Self::new(self.item.next_rev(), None) + Self::new(self.item.next_rev()) } } diff --git a/crates/fluvio-stream-model/src/store/k8.rs b/crates/fluvio-stream-model/src/store/k8.rs index 2fb880ca44..7fc6294a49 100644 --- a/crates/fluvio-stream-model/src/store/k8.rs +++ b/crates/fluvio-stream-model/src/store/k8.rs @@ -21,6 +21,7 @@ pub type K8MetadataContext = MetadataContext; pub struct K8MetaItem { revision: u64, inner: ObjectMeta, + owner: Option>, } /// for sake of comparison, we only care about couple of fields in the metadata @@ -44,6 +45,7 @@ impl K8MetaItem { Self { revision: 0, inner: ObjectMeta::new(name, name_space), + owner: Default::default(), } } @@ -54,29 +56,6 @@ impl K8MetaItem { pub fn revision(&self) -> u64 { self.revision } - - /// create owner if exists, only worry about first references - pub fn owner_owned(&self) -> Option { - if self.inner.owner_references.is_empty() { - None - } else { - if self.inner.owner_references.len() > 1 { - error!("too many owners: {:#?}", self.inner); - } - - let owner = &self.inner.owner_references[0]; - - Some(Self { - revision: 0, - inner: ObjectMeta { - name: owner.name.to_owned(), - namespace: self.namespace.to_owned(), - uid: owner.uid.to_owned(), - ..Default::default() - }, - }) - } - } } impl Deref for K8MetaItem { @@ -108,17 +87,24 @@ impl MetadataItem for K8MetaItem { self.inner.deletion_grace_period_seconds.is_some() } - fn set_labels>(self, labels: Vec<(T, T)>) -> Self { - Self { - revision: self.revision, - inner: self.inner.set_labels(labels), - } + fn set_labels>(mut self, labels: Vec<(T, T)>) -> Self { + let inner = self.inner.set_labels(labels); + self.inner = inner; + self } /// get string labels fn get_labels(&self) -> HashMap { self.inner.labels.clone() } + + fn owner(&self) -> Option<&Self> { + self.owner.as_ref().map(|w| w.as_ref()) + } + + fn set_owner(&mut self, owner: Self) { + self.owner = Some(Box::new(owner)) + } } impl TryFrom for K8MetaItem { @@ -129,13 +115,32 @@ impl TryFrom for K8MetaItem { return Ok(Self { revision: 0, inner: value, + ..Default::default() }); } let revision: u64 = value.resource_version.parse()?; + if value.owner_references.len() > 1 { + error!("too many owners: {value:#?}"); + } + let owner = if let Some(owner_ref) = value.owner_references.get(0) { + let inner = ObjectMeta { + name: owner_ref.name.to_owned(), + namespace: value.namespace.to_owned(), + uid: owner_ref.uid.to_owned(), + ..Default::default() + }; + Some(Box::new(Self { + inner, + ..Default::default() + })) + } else { + None + }; Ok(Self { revision, inner: value, + owner, }) } } @@ -200,9 +205,8 @@ where match ctx_item_result { Ok(ctx_item) => { // trace!("k8 revision: {}, meta revision: {}",ctx_item.revision(),ctx_item.inner().resource_version); - let owner = ctx_item.owner_owned(); Ok(MetadataStoreObject::new(key, local_spec, local_status) - .with_context(MetadataContext::new(ctx_item, owner))) + .with_context(MetadataContext::new(ctx_item))) } Err(err) => Err(K8ConvertError::KeyConvertionError(IoError::new( ErrorKind::InvalidData, diff --git a/crates/fluvio-stream-model/src/store/metadata.rs b/crates/fluvio-stream-model/src/store/metadata.rs index 5bdcda3dc4..934ba4385b 100644 --- a/crates/fluvio-stream-model/src/store/metadata.rs +++ b/crates/fluvio-stream-model/src/store/metadata.rs @@ -129,7 +129,7 @@ where /// check if metadata is owned by other pub fn is_owned(&self, uid: &C::UId) -> bool { - match self.ctx().owner() { + match self.ctx().item().owner() { Some(parent) => parent.uid() == uid, None => false, }