Skip to content

Commit

Permalink
feat: add cascade deletion for local metadata (#3638)
Browse files Browse the repository at this point in the history
Added cascade deletion for local metadata.

Also, fixed a bug with deserialization failure for nested enums if the outer enum is tagged. More details here dtolnay/serde-yaml#378. Fixed by removing external tagging for versioned spec and added unit-tests that cover this case.

Also fixes #3632
  • Loading branch information
Alexander Galibey committed Nov 3, 2023
1 parent 3985fb9 commit 4d4151a
Show file tree
Hide file tree
Showing 13 changed files with 721 additions and 166 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Expand Up @@ -1163,7 +1163,7 @@ jobs:
- 6
replication:
- 1
run-mode: [local-k8]
run-mode: [local, local-k8]
steps:
- name: Checkout Source Code
uses: actions/checkout@v4
Expand All @@ -1182,11 +1182,13 @@ jobs:
path: ~/.fluvio/extensions

- name: Setup K8s Cluster
if: matrix.run-mode == 'local-k8'
run: |
curl -s https://raw.githubusercontent.com/rancher/k3d/main/install.sh | TAG=${{ env.K3D_VERSION }} bash
./k8-util/cluster/reset-k3d.sh
- name: Wait 15s for K3D Reset
if: matrix.run-mode == 'local-k8'
run: sleep 15

- name: Set up Fluvio Binaries
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Expand Up @@ -180,8 +180,8 @@ fluvio-smartmodule = { version = "0.7.0", path = "crates/fluvio-smartmodule", de
fluvio-socket = { version = "0.14.3", path = "crates/fluvio-socket", default-features = false }
fluvio-spu-schema = { version = "0.14.5", path = "crates/fluvio-spu-schema", default-features = false }
fluvio-storage = { path = "crates/fluvio-storage" }
fluvio-stream-dispatcher = { version = "0.12.0", path = "crates/fluvio-stream-dispatcher" }
fluvio-stream-model = { version = "0.10.0", path = "crates/fluvio-stream-model", default-features = false }
fluvio-stream-dispatcher = { version = "0.13.0", path = "crates/fluvio-stream-dispatcher" }
fluvio-stream-model = { version = "0.11.0", path = "crates/fluvio-stream-model", default-features = false }
fluvio-types = { version = "0.4.4", path = "crates/fluvio-types", default-features = false }

# Used to make eyre faster on debug builds
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc/src/k8/objects/spu_k8_config.rs
Expand Up @@ -214,7 +214,7 @@ mod extended {
match ScK8Config::from(k8_obj.header.data) {
Ok(config) => match k8_obj.metadata.try_into() {
Ok(ctx_item) => {
let ctx = MetadataContext::new(ctx_item, None);
let ctx = MetadataContext::new(ctx_item);
Ok(
MetadataStoreObject::new("fluvio", config, FluvioConfigStatus {})
.with_context(ctx),
Expand Down
3 changes: 2 additions & 1 deletion crates/fluvio-sc/src/services/private_api/private_server.rs
Expand Up @@ -19,6 +19,7 @@ use fluvio_controlplane::spu_api::update_spu::UpdateSpuRequest;
use fluvio_controlplane_metadata::message::Message;
use fluvio_stream_model::core::MetadataItem;
use fluvio_stream_model::store::ChangeListener;
use tracing::warn;
use tracing::{debug, info, trace, instrument, error};
use async_trait::async_trait;
use futures_util::stream::Stream;
Expand Down Expand Up @@ -267,7 +268,7 @@ where
None
}
} else {
error!("replica doesn't exist");
warn!("replica doesn't exist");
None
};

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-stream-dispatcher/Cargo.toml
@@ -1,7 +1,7 @@
[package]
name = "fluvio-stream-dispatcher"
edition = "2021"
version = "0.12.0"
version = "0.13.0"
authors = ["Fluvio Contributors <team@fluvio.io>"]
description = "Fluvio Event Stream access"
repository = "https://github.com/infinyon/fluvio"
Expand Down
7 changes: 4 additions & 3 deletions crates/fluvio-stream-dispatcher/src/metadata/k8.rs
Expand Up @@ -118,7 +118,7 @@ impl<T: K8MetadataClient> MetadataClient<K8MetaItem> for T {
let (key, spec, _status, ctx) = value.parts();
let k8_spec: S::K8Spec = spec.into_k8();

let mut input_metadata = if let Some(parent_metadata) = ctx.owner() {
let mut input_metadata = if let Some(parent_metadata) = ctx.item().owner() {
debug!("owner exists");
let item_name = key.to_string();

Expand Down Expand Up @@ -458,10 +458,11 @@ mod tests {
let namespace = NameSpace::Named("ns1".to_string());
let key = "child".to_string();
let parent_key = "parent".to_string();
let meta = K8MetaItem::new(key.clone(), namespace.to_string());
let mut meta = K8MetaItem::new(key.clone(), namespace.to_string());
let parent_meta = K8MetaItem::new(parent_key.clone(), namespace.to_string());
meta.set_owner(parent_meta);

let ctx = fluvio_stream_model::store::k8::K8MetadataContext::new(meta, Some(parent_meta));
let ctx = fluvio_stream_model::store::k8::K8MetadataContext::new(meta);

let obj = MetadataStoreObject::new_with_context(key.clone(), TestSpec::default(), ctx);

Expand Down

0 comments on commit 4d4151a

Please sign in to comment.