From 8f63046fb9c026dee669d262395c958dceb3f70c Mon Sep 17 00:00:00 2001 From: Ashutosh Narkar Date: Thu, 28 Jul 2022 23:08:40 -0700 Subject: [PATCH] storage+bundle: Clean old bundle data before new activation (#4944) If OPA has an activated bundle that owns all roots and a new bundle with empty roots is to be activated, the old bundle's data should first be erased from the store. Currently both the old and new data is kept in the store. This commit attempts to fix this by providing an indication to the truncate call about the scenario in which the root is to be overwritten. Fixes: #4940 Signed-off-by: Ashutosh Narkar --- bundle/store.go | 10 + bundle/store_test.go | 746 +++++++++++++++++++++++++++++++++++++++++ storage/disk/disk.go | 16 + storage/disk/txn.go | 6 + storage/inmem/inmem.go | 13 +- storage/interface.go | 3 + 6 files changed, 791 insertions(+), 3 deletions(-) diff --git a/bundle/store.go b/bundle/store.go index 4bb12c1179..570681a73c 100644 --- a/bundle/store.go +++ b/bundle/store.go @@ -727,6 +727,16 @@ func writeDataAndModules(ctx context.Context, store storage.Store, txn storage.T } } } else { + var rootOverwrite bool + for _, root := range *b.Manifest.Roots { + if root == "" { + rootOverwrite = true + break + } + } + + params.RootOverwrite = rootOverwrite + err := store.Truncate(ctx, txn, params, NewIterator(b.Raw)) if err != nil { return fmt.Errorf("store truncate failed for bundle '%s': %v", name, err) diff --git a/bundle/store_test.go b/bundle/store_test.go index dce93641d8..97659d0eb7 100644 --- a/bundle/store_test.go +++ b/bundle/store_test.go @@ -662,6 +662,752 @@ func TestBundleLazyModeLifecycle(t *testing.T) { mockStore.AssertValid(t) } +func TestBundleLazyModeLifecycleRawNoBundleRoots(t *testing.T) { + files := [][2]string{ + {"/a/b/c/data.json", "[1,2,3]"}, + {"/a/b/d/data.json", "true"}, + {"/a/b/y/data.yaml", `foo: 1`}, + {"/example/example.rego", `package example`}, + {"/data.json", `{"x": {"y": true}, "a": {"b": {"z": true}}}}`}, + {"/.manifest", `{"revision": "rev-1"}`}, + } + + buf := archive.MustWriteTarGz(files) + loader := NewTarballLoaderWithBaseURL(buf, "") + br := NewCustomReader(loader).WithBundleEtag("foo").WithLazyLoadingMode(true) + + bundle, err := br.Read() + if err != nil { + t.Fatal(err) + } + + ctx := context.Background() + mockStore := mock.New() + + compiler := ast.NewCompiler() + m := metrics.New() + + bundles := map[string]*Bundle{ + "bundle1": &bundle, + } + + txn := storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = mockStore.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the bundle was activated + txn = storage.NewTransactionOrDie(ctx, mockStore) + names, err := ReadBundleNamesFromStore(ctx, mockStore, txn) + if err != nil { + t.Fatal(err) + } + + if len(names) != len(bundles) { + t.Fatalf("expected %d bundles in store, found %d", len(bundles), len(names)) + } + for _, name := range names { + if _, ok := bundles[name]; !ok { + t.Fatalf("unexpected bundle name found in store: %s", name) + } + } + + for bundleName, bundle := range bundles { + for modName := range bundle.ParsedModules(bundleName) { + if _, ok := compiler.Modules[modName]; !ok { + t.Fatalf("expected module %s from bundle %s to have been compiled", modName, bundleName) + } + } + } + + actual, err := mockStore.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw := ` +{ + "a": { + "b": { + "c": [1,2,3], + "d": true, + "y": { + "foo": 1 + }, + "z": true + } + }, + "x": { + "y": true + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-1", + "roots": [""] + }, + "etag": "foo" + } + } + } +} +` + expected := loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + mockStore.Abort(ctx, txn) + + files = [][2]string{ + {"/c/data.json", `{"hello": "world"}`}, + {"/.manifest", `{"revision": "rev-2"}`}, + } + + buf = archive.MustWriteTarGz(files) + loader = NewTarballLoaderWithBaseURL(buf, "") + br = NewCustomReader(loader).WithBundleEtag("bar").WithLazyLoadingMode(true) + + bundle, err = br.Read() + if err != nil { + t.Fatal(err) + } + + bundles = map[string]*Bundle{ + "bundle1": &bundle, + } + + txn = storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = mockStore.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + txn = storage.NewTransactionOrDie(ctx, mockStore) + + actual, err = mockStore.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw = ` + { + "c": { + "hello": "world" + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-2", + "roots": [""] + }, + "etag": "bar" + } + } + } + }` + + expected = loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + mockStore.Abort(ctx, txn) + +} + +func TestBundleLazyModeLifecycleRawNoBundleRootsDiskStorage(t *testing.T) { + ctx := context.Background() + + test.WithTempFS(nil, func(dir string) { + store, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ + Dir: dir, + }) + if err != nil { + t.Fatal(err) + } + + compiler := ast.NewCompiler() + m := metrics.New() + + files := [][2]string{ + {"/a/b/c/data.json", "[1,2,3]"}, + {"/a/b/d/data.json", "true"}, + {"/a/b/y/data.yaml", `foo: 1`}, + {"/example/example.rego", `package example`}, + {"/data.json", `{"x": {"y": true}, "a": {"b": {"z": true}}}}`}, + {"/.manifest", `{"revision": "rev-1"}`}, + } + + buf := archive.MustWriteTarGz(files) + loader := NewTarballLoaderWithBaseURL(buf, "") + br := NewCustomReader(loader).WithBundleEtag("foo").WithLazyLoadingMode(true) + + bundle, err := br.Read() + if err != nil { + t.Fatal(err) + } + + bundles := map[string]*Bundle{ + "bundle1": &bundle, + } + + txn := storage.NewTransactionOrDie(ctx, store, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: store, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = store.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the bundle was activated + txn = storage.NewTransactionOrDie(ctx, store) + names, err := ReadBundleNamesFromStore(ctx, store, txn) + if err != nil { + t.Fatal(err) + } + + if len(names) != len(bundles) { + t.Fatalf("expected %d bundles in store, found %d", len(bundles), len(names)) + } + for _, name := range names { + if _, ok := bundles[name]; !ok { + t.Fatalf("unexpected bundle name found in store: %s", name) + } + } + + for bundleName, bundle := range bundles { + for modName := range bundle.ParsedModules(bundleName) { + if _, ok := compiler.Modules[modName]; !ok { + t.Fatalf("expected module %s from bundle %s to have been compiled", modName, bundleName) + } + } + } + + actual, err := store.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw := ` +{ + "a": { + "b": { + "c": [1,2,3], + "d": true, + "y": { + "foo": 1 + }, + "z": true + } + }, + "x": { + "y": true + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-1", + "roots": [""] + }, + "etag": "foo" + } + } + } +} +` + expected := loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + store.Abort(ctx, txn) + + files = [][2]string{ + {"/c/data.json", `{"hello": "world"}`}, + {"/.manifest", `{"revision": "rev-2"}`}, + } + + buf = archive.MustWriteTarGz(files) + loader = NewTarballLoaderWithBaseURL(buf, "") + br = NewCustomReader(loader).WithBundleEtag("bar").WithLazyLoadingMode(true) + + bundle, err = br.Read() + if err != nil { + t.Fatal(err) + } + + bundles = map[string]*Bundle{ + "bundle1": &bundle, + } + + txn = storage.NewTransactionOrDie(ctx, store, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: store, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = store.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + txn = storage.NewTransactionOrDie(ctx, store) + + actual, err = store.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw = ` + { + "c": { + "hello": "world" + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-2", + "roots": [""] + }, + "etag": "bar" + } + } + } + }` + + expected = loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + store.Abort(ctx, txn) + }) +} + +func TestBundleLazyModeLifecycleNoBundleRoots(t *testing.T) { + ctx := context.Background() + mockStore := mock.New() + compiler := ast.NewCompiler() + m := metrics.New() + + mod1 := "package a\np = true" + + b := Bundle{ + Manifest: Manifest{Revision: "rev-1"}, + Data: map[string]interface{}{ + "a": map[string]interface{}{ + "b": "foo", + "e": map[string]interface{}{ + "f": "bar", + }, + "x": []map[string]string{{"name": "john"}, {"name": "jane"}}, + }, + }, + Modules: []ModuleFile{ + { + Path: "a/policy.rego", + Raw: []byte(mod1), + Parsed: ast.MustParseModule(mod1), + }, + }, + Etag: "foo", + } + + var buf1 bytes.Buffer + if err := NewWriter(&buf1).UseModulePath(true).Write(b); err != nil { + t.Fatal("Unexpected error:", err) + } + loader := NewTarballLoaderWithBaseURL(&buf1, "") + bundle1, err := NewCustomReader(loader).WithLazyLoadingMode(true).WithBundleName("bundle1").Read() + if err != nil { + t.Fatal(err) + } + + bundles := map[string]*Bundle{ + "bundle1": &bundle1, + } + + txn := storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = mockStore.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the patches were applied + txn = storage.NewTransactionOrDie(ctx, mockStore) + + actual, err := mockStore.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw := ` + { + "a": { + "b": "foo", + "e": { + "f": "bar" + }, + "x": [{"name": "john"}, {"name": "jane"}] + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-1", + "roots": [""] + }, + "etag": "" + } + } + } + }` + + expected := loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + mockStore.Abort(ctx, txn) + + // add a new bundle with no roots. this means all the data from the currently activated should be removed + b = Bundle{ + Manifest: Manifest{Revision: "rev-2"}, + Data: map[string]interface{}{ + "c": map[string]interface{}{ + "hello": "world", + }, + }, + Etag: "bar", + } + + var buf2 bytes.Buffer + if err := NewWriter(&buf2).UseModulePath(true).Write(b); err != nil { + t.Fatal("Unexpected error:", err) + } + + loader = NewTarballLoaderWithBaseURL(&buf2, "") + bundle2, err := NewCustomReader(loader).WithLazyLoadingMode(true).WithBundleName("bundle1").Read() + if err != nil { + t.Fatal(err) + } + + bundles = map[string]*Bundle{ + "bundle1": &bundle2, + } + + txn = storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = mockStore.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the patches were applied + txn = storage.NewTransactionOrDie(ctx, mockStore) + + actual, err = mockStore.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw = ` + { + "c": { + "hello": "world" + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-2", + "roots": [""] + }, + "etag": "" + } + } + } + }` + + expected = loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + mockStore.Abort(ctx, txn) +} + +func TestBundleLazyModeLifecycleNoBundleRootsDiskStorage(t *testing.T) { + ctx := context.Background() + + test.WithTempFS(nil, func(dir string) { + store, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ + Dir: dir, + }) + if err != nil { + t.Fatal(err) + } + + compiler := ast.NewCompiler() + m := metrics.New() + + mod1 := "package a\np = true" + + b := Bundle{ + Manifest: Manifest{Revision: "rev-1"}, + Data: map[string]interface{}{ + "a": map[string]interface{}{ + "b": "foo", + "e": map[string]interface{}{ + "f": "bar", + }, + "x": []map[string]string{{"name": "john"}, {"name": "jane"}}, + }, + }, + Modules: []ModuleFile{ + { + Path: "a/policy.rego", + Raw: []byte(mod1), + Parsed: ast.MustParseModule(mod1), + }, + }, + Etag: "foo", + } + + var buf1 bytes.Buffer + if err := NewWriter(&buf1).UseModulePath(true).Write(b); err != nil { + t.Fatal("Unexpected error:", err) + } + loader := NewTarballLoaderWithBaseURL(&buf1, "") + bundle1, err := NewCustomReader(loader).WithLazyLoadingMode(true).WithBundleName("bundle1").Read() + if err != nil { + t.Fatal(err) + } + + bundles := map[string]*Bundle{ + "bundle1": &bundle1, + } + + txn := storage.NewTransactionOrDie(ctx, store, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: store, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = store.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the patches were applied + txn = storage.NewTransactionOrDie(ctx, store) + + actual, err := store.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw := ` + { + "a": { + "b": "foo", + "e": { + "f": "bar" + }, + "x": [{"name": "john"}, {"name": "jane"}] + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-1", + "roots": [""] + }, + "etag": "" + } + } + } + }` + + expected := loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + store.Abort(ctx, txn) + + // add a new bundle with no roots. this means all the data from the currently activated should be removed + b = Bundle{ + Manifest: Manifest{Revision: "rev-2"}, + Data: map[string]interface{}{ + "c": map[string]interface{}{ + "hello": "world", + }, + }, + Etag: "bar", + } + + var buf2 bytes.Buffer + if err := NewWriter(&buf2).UseModulePath(true).Write(b); err != nil { + t.Fatal("Unexpected error:", err) + } + + loader = NewTarballLoaderWithBaseURL(&buf2, "") + bundle2, err := NewCustomReader(loader).WithLazyLoadingMode(true).WithBundleName("bundle1").Read() + if err != nil { + t.Fatal(err) + } + + bundles = map[string]*Bundle{ + "bundle1": &bundle2, + } + + txn = storage.NewTransactionOrDie(ctx, store, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: store, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = store.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the patches were applied + txn = storage.NewTransactionOrDie(ctx, store) + + actual, err = store.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw = ` + { + "c": { + "hello": "world" + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-2", + "roots": [""] + }, + "etag": "" + } + } + } + }` + + expected = loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + store.Abort(ctx, txn) + }) +} + func TestDeltaBundleLazyModeLifecycleDiskStorage(t *testing.T) { ctx := context.Background() diff --git a/storage/disk/disk.go b/storage/disk/disk.go index 6a6ff05ea3..80f557bbac 100644 --- a/storage/disk/disk.go +++ b/storage/disk/disk.go @@ -263,6 +263,22 @@ func (db *Store) Truncate(ctx context.Context, txn storage.Transaction, params s xid := atomic.AddUint64(&db.xid, uint64(1)) underlyingTxn := newTransaction(xid, true, underlying, params.Context, db.pm, db.partitions, nil) + if params.RootOverwrite { + newPath, ok := storage.ParsePathEscaped("/") + if !ok { + return fmt.Errorf("storage path invalid: %v", newPath) + } + + sTxn, err := db.doTruncateData(ctx, underlyingTxn, newDB, params, newPath, map[string]interface{}{}) + if err != nil { + return wrapError(err) + } + + if sTxn != nil { + underlyingTxn = sTxn + } + } + for { var update *storage.Update diff --git a/storage/disk/txn.go b/storage/disk/txn.go index 68797e040a..95f962db5e 100644 --- a/storage/disk/txn.go +++ b/storage/disk/txn.go @@ -329,6 +329,12 @@ func (txn *transaction) partitionWriteMultiple(node *partitionTrie, path storage return nil, err } return txn.doPartitionWriteMultiple(node, path, bs, result) + case map[string]json.RawMessage: + bs, err := serialize(v) + if err != nil { + return nil, err + } + return txn.doPartitionWriteMultiple(node, path, bs, result) case json.RawMessage: return txn.doPartitionWriteMultiple(node, path, v, result) case []uint8: diff --git a/storage/inmem/inmem.go b/storage/inmem/inmem.go index 818bd17a96..770c30b6c8 100644 --- a/storage/inmem/inmem.go +++ b/storage/inmem/inmem.go @@ -97,10 +97,10 @@ func (db *store) NewTransaction(_ context.Context, params ...storage.Transaction } // Truncate implements the storage.Store interface. This method must be called within a transaction. -func (db *store) Truncate(ctx context.Context, txn storage.Transaction, _ storage.TransactionParams, it storage.Iterator) error { +func (db *store) Truncate(ctx context.Context, txn storage.Transaction, params storage.TransactionParams, it storage.Iterator) error { var update *storage.Update var err error - var mergedData map[string]interface{} + mergedData := map[string]interface{}{} underlying, err := db.underlying(txn) if err != nil { @@ -150,7 +150,14 @@ func (db *store) Truncate(ctx context.Context, txn storage.Transaction, _ storag return err } - // write merged data to store + if params.RootOverwrite { + newPath, ok := storage.ParsePathEscaped("/") + if !ok { + return fmt.Errorf("storage path invalid: %v", newPath) + } + return underlying.Write(storage.AddOp, newPath, mergedData) + } + for k := range mergedData { newPath, ok := storage.ParsePathEscaped("/" + k) if !ok { diff --git a/storage/interface.go b/storage/interface.go index 20981a78cb..b58d41ccff 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -52,6 +52,9 @@ type MakeDirer interface { // TransactionParams describes a new transaction. type TransactionParams struct { + // RootOverwrite indicates if this transaction will overwrite data at root. + RootOverwrite bool + // Write indicates if this transaction will perform any write operations. Write bool