diff --git a/bundle/store.go b/bundle/store.go index 4bb12c1179..570681a73c 100644 --- a/bundle/store.go +++ b/bundle/store.go @@ -727,6 +727,16 @@ func writeDataAndModules(ctx context.Context, store storage.Store, txn storage.T } } } else { + var rootOverwrite bool + for _, root := range *b.Manifest.Roots { + if root == "" { + rootOverwrite = true + break + } + } + + params.RootOverwrite = rootOverwrite + err := store.Truncate(ctx, txn, params, NewIterator(b.Raw)) if err != nil { return fmt.Errorf("store truncate failed for bundle '%s': %v", name, err) diff --git a/bundle/store_test.go b/bundle/store_test.go index dce93641d8..97659d0eb7 100644 --- a/bundle/store_test.go +++ b/bundle/store_test.go @@ -662,6 +662,752 @@ func TestBundleLazyModeLifecycle(t *testing.T) { mockStore.AssertValid(t) } +func TestBundleLazyModeLifecycleRawNoBundleRoots(t *testing.T) { + files := [][2]string{ + {"/a/b/c/data.json", "[1,2,3]"}, + {"/a/b/d/data.json", "true"}, + {"/a/b/y/data.yaml", `foo: 1`}, + {"/example/example.rego", `package example`}, + {"/data.json", `{"x": {"y": true}, "a": {"b": {"z": true}}}}`}, + {"/.manifest", `{"revision": "rev-1"}`}, + } + + buf := archive.MustWriteTarGz(files) + loader := NewTarballLoaderWithBaseURL(buf, "") + br := NewCustomReader(loader).WithBundleEtag("foo").WithLazyLoadingMode(true) + + bundle, err := br.Read() + if err != nil { + t.Fatal(err) + } + + ctx := context.Background() + mockStore := mock.New() + + compiler := ast.NewCompiler() + m := metrics.New() + + bundles := map[string]*Bundle{ + "bundle1": &bundle, + } + + txn := storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = mockStore.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the bundle was activated + txn = storage.NewTransactionOrDie(ctx, mockStore) + names, err := ReadBundleNamesFromStore(ctx, mockStore, txn) + if err != nil { + t.Fatal(err) + } + + if len(names) != len(bundles) { + t.Fatalf("expected %d bundles in store, found %d", len(bundles), len(names)) + } + for _, name := range names { + if _, ok := bundles[name]; !ok { + t.Fatalf("unexpected bundle name found in store: %s", name) + } + } + + for bundleName, bundle := range bundles { + for modName := range bundle.ParsedModules(bundleName) { + if _, ok := compiler.Modules[modName]; !ok { + t.Fatalf("expected module %s from bundle %s to have been compiled", modName, bundleName) + } + } + } + + actual, err := mockStore.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw := ` +{ + "a": { + "b": { + "c": [1,2,3], + "d": true, + "y": { + "foo": 1 + }, + "z": true + } + }, + "x": { + "y": true + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-1", + "roots": [""] + }, + "etag": "foo" + } + } + } +} +` + expected := loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + mockStore.Abort(ctx, txn) + + files = [][2]string{ + {"/c/data.json", `{"hello": "world"}`}, + {"/.manifest", `{"revision": "rev-2"}`}, + } + + buf = archive.MustWriteTarGz(files) + loader = NewTarballLoaderWithBaseURL(buf, "") + br = NewCustomReader(loader).WithBundleEtag("bar").WithLazyLoadingMode(true) + + bundle, err = br.Read() + if err != nil { + t.Fatal(err) + } + + bundles = map[string]*Bundle{ + "bundle1": &bundle, + } + + txn = storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = mockStore.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + txn = storage.NewTransactionOrDie(ctx, mockStore) + + actual, err = mockStore.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw = ` + { + "c": { + "hello": "world" + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-2", + "roots": [""] + }, + "etag": "bar" + } + } + } + }` + + expected = loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + mockStore.Abort(ctx, txn) + +} + +func TestBundleLazyModeLifecycleRawNoBundleRootsDiskStorage(t *testing.T) { + ctx := context.Background() + + test.WithTempFS(nil, func(dir string) { + store, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ + Dir: dir, + }) + if err != nil { + t.Fatal(err) + } + + compiler := ast.NewCompiler() + m := metrics.New() + + files := [][2]string{ + {"/a/b/c/data.json", "[1,2,3]"}, + {"/a/b/d/data.json", "true"}, + {"/a/b/y/data.yaml", `foo: 1`}, + {"/example/example.rego", `package example`}, + {"/data.json", `{"x": {"y": true}, "a": {"b": {"z": true}}}}`}, + {"/.manifest", `{"revision": "rev-1"}`}, + } + + buf := archive.MustWriteTarGz(files) + loader := NewTarballLoaderWithBaseURL(buf, "") + br := NewCustomReader(loader).WithBundleEtag("foo").WithLazyLoadingMode(true) + + bundle, err := br.Read() + if err != nil { + t.Fatal(err) + } + + bundles := map[string]*Bundle{ + "bundle1": &bundle, + } + + txn := storage.NewTransactionOrDie(ctx, store, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: store, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = store.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the bundle was activated + txn = storage.NewTransactionOrDie(ctx, store) + names, err := ReadBundleNamesFromStore(ctx, store, txn) + if err != nil { + t.Fatal(err) + } + + if len(names) != len(bundles) { + t.Fatalf("expected %d bundles in store, found %d", len(bundles), len(names)) + } + for _, name := range names { + if _, ok := bundles[name]; !ok { + t.Fatalf("unexpected bundle name found in store: %s", name) + } + } + + for bundleName, bundle := range bundles { + for modName := range bundle.ParsedModules(bundleName) { + if _, ok := compiler.Modules[modName]; !ok { + t.Fatalf("expected module %s from bundle %s to have been compiled", modName, bundleName) + } + } + } + + actual, err := store.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw := ` +{ + "a": { + "b": { + "c": [1,2,3], + "d": true, + "y": { + "foo": 1 + }, + "z": true + } + }, + "x": { + "y": true + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-1", + "roots": [""] + }, + "etag": "foo" + } + } + } +} +` + expected := loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + store.Abort(ctx, txn) + + files = [][2]string{ + {"/c/data.json", `{"hello": "world"}`}, + {"/.manifest", `{"revision": "rev-2"}`}, + } + + buf = archive.MustWriteTarGz(files) + loader = NewTarballLoaderWithBaseURL(buf, "") + br = NewCustomReader(loader).WithBundleEtag("bar").WithLazyLoadingMode(true) + + bundle, err = br.Read() + if err != nil { + t.Fatal(err) + } + + bundles = map[string]*Bundle{ + "bundle1": &bundle, + } + + txn = storage.NewTransactionOrDie(ctx, store, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: store, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = store.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + txn = storage.NewTransactionOrDie(ctx, store) + + actual, err = store.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw = ` + { + "c": { + "hello": "world" + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-2", + "roots": [""] + }, + "etag": "bar" + } + } + } + }` + + expected = loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + store.Abort(ctx, txn) + }) +} + +func TestBundleLazyModeLifecycleNoBundleRoots(t *testing.T) { + ctx := context.Background() + mockStore := mock.New() + compiler := ast.NewCompiler() + m := metrics.New() + + mod1 := "package a\np = true" + + b := Bundle{ + Manifest: Manifest{Revision: "rev-1"}, + Data: map[string]interface{}{ + "a": map[string]interface{}{ + "b": "foo", + "e": map[string]interface{}{ + "f": "bar", + }, + "x": []map[string]string{{"name": "john"}, {"name": "jane"}}, + }, + }, + Modules: []ModuleFile{ + { + Path: "a/policy.rego", + Raw: []byte(mod1), + Parsed: ast.MustParseModule(mod1), + }, + }, + Etag: "foo", + } + + var buf1 bytes.Buffer + if err := NewWriter(&buf1).UseModulePath(true).Write(b); err != nil { + t.Fatal("Unexpected error:", err) + } + loader := NewTarballLoaderWithBaseURL(&buf1, "") + bundle1, err := NewCustomReader(loader).WithLazyLoadingMode(true).WithBundleName("bundle1").Read() + if err != nil { + t.Fatal(err) + } + + bundles := map[string]*Bundle{ + "bundle1": &bundle1, + } + + txn := storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = mockStore.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the patches were applied + txn = storage.NewTransactionOrDie(ctx, mockStore) + + actual, err := mockStore.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw := ` + { + "a": { + "b": "foo", + "e": { + "f": "bar" + }, + "x": [{"name": "john"}, {"name": "jane"}] + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-1", + "roots": [""] + }, + "etag": "" + } + } + } + }` + + expected := loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + mockStore.Abort(ctx, txn) + + // add a new bundle with no roots. this means all the data from the currently activated should be removed + b = Bundle{ + Manifest: Manifest{Revision: "rev-2"}, + Data: map[string]interface{}{ + "c": map[string]interface{}{ + "hello": "world", + }, + }, + Etag: "bar", + } + + var buf2 bytes.Buffer + if err := NewWriter(&buf2).UseModulePath(true).Write(b); err != nil { + t.Fatal("Unexpected error:", err) + } + + loader = NewTarballLoaderWithBaseURL(&buf2, "") + bundle2, err := NewCustomReader(loader).WithLazyLoadingMode(true).WithBundleName("bundle1").Read() + if err != nil { + t.Fatal(err) + } + + bundles = map[string]*Bundle{ + "bundle1": &bundle2, + } + + txn = storage.NewTransactionOrDie(ctx, mockStore, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: mockStore, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = mockStore.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the patches were applied + txn = storage.NewTransactionOrDie(ctx, mockStore) + + actual, err = mockStore.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw = ` + { + "c": { + "hello": "world" + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-2", + "roots": [""] + }, + "etag": "" + } + } + } + }` + + expected = loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + mockStore.Abort(ctx, txn) +} + +func TestBundleLazyModeLifecycleNoBundleRootsDiskStorage(t *testing.T) { + ctx := context.Background() + + test.WithTempFS(nil, func(dir string) { + store, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ + Dir: dir, + }) + if err != nil { + t.Fatal(err) + } + + compiler := ast.NewCompiler() + m := metrics.New() + + mod1 := "package a\np = true" + + b := Bundle{ + Manifest: Manifest{Revision: "rev-1"}, + Data: map[string]interface{}{ + "a": map[string]interface{}{ + "b": "foo", + "e": map[string]interface{}{ + "f": "bar", + }, + "x": []map[string]string{{"name": "john"}, {"name": "jane"}}, + }, + }, + Modules: []ModuleFile{ + { + Path: "a/policy.rego", + Raw: []byte(mod1), + Parsed: ast.MustParseModule(mod1), + }, + }, + Etag: "foo", + } + + var buf1 bytes.Buffer + if err := NewWriter(&buf1).UseModulePath(true).Write(b); err != nil { + t.Fatal("Unexpected error:", err) + } + loader := NewTarballLoaderWithBaseURL(&buf1, "") + bundle1, err := NewCustomReader(loader).WithLazyLoadingMode(true).WithBundleName("bundle1").Read() + if err != nil { + t.Fatal(err) + } + + bundles := map[string]*Bundle{ + "bundle1": &bundle1, + } + + txn := storage.NewTransactionOrDie(ctx, store, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: store, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = store.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the patches were applied + txn = storage.NewTransactionOrDie(ctx, store) + + actual, err := store.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw := ` + { + "a": { + "b": "foo", + "e": { + "f": "bar" + }, + "x": [{"name": "john"}, {"name": "jane"}] + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-1", + "roots": [""] + }, + "etag": "" + } + } + } + }` + + expected := loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + store.Abort(ctx, txn) + + // add a new bundle with no roots. this means all the data from the currently activated should be removed + b = Bundle{ + Manifest: Manifest{Revision: "rev-2"}, + Data: map[string]interface{}{ + "c": map[string]interface{}{ + "hello": "world", + }, + }, + Etag: "bar", + } + + var buf2 bytes.Buffer + if err := NewWriter(&buf2).UseModulePath(true).Write(b); err != nil { + t.Fatal("Unexpected error:", err) + } + + loader = NewTarballLoaderWithBaseURL(&buf2, "") + bundle2, err := NewCustomReader(loader).WithLazyLoadingMode(true).WithBundleName("bundle1").Read() + if err != nil { + t.Fatal(err) + } + + bundles = map[string]*Bundle{ + "bundle1": &bundle2, + } + + txn = storage.NewTransactionOrDie(ctx, store, storage.WriteParams) + + err = Activate(&ActivateOpts{ + Ctx: ctx, + Store: store, + Txn: txn, + Compiler: compiler, + Metrics: m, + Bundles: bundles, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + err = store.Commit(ctx, txn) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + // Ensure the patches were applied + txn = storage.NewTransactionOrDie(ctx, store) + + actual, err = store.Read(ctx, txn, storage.MustParsePath("/")) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + expectedRaw = ` + { + "c": { + "hello": "world" + }, + "system": { + "bundles": { + "bundle1": { + "manifest": { + "revision": "rev-2", + "roots": [""] + }, + "etag": "" + } + } + } + }` + + expected = loadExpectedSortedResult(expectedRaw) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %v, got %v", expectedRaw, string(util.MustMarshalJSON(actual))) + } + + // Stop the "read" transaction + store.Abort(ctx, txn) + }) +} + func TestDeltaBundleLazyModeLifecycleDiskStorage(t *testing.T) { ctx := context.Background() diff --git a/storage/disk/disk.go b/storage/disk/disk.go index 6a6ff05ea3..80f557bbac 100644 --- a/storage/disk/disk.go +++ b/storage/disk/disk.go @@ -263,6 +263,22 @@ func (db *Store) Truncate(ctx context.Context, txn storage.Transaction, params s xid := atomic.AddUint64(&db.xid, uint64(1)) underlyingTxn := newTransaction(xid, true, underlying, params.Context, db.pm, db.partitions, nil) + if params.RootOverwrite { + newPath, ok := storage.ParsePathEscaped("/") + if !ok { + return fmt.Errorf("storage path invalid: %v", newPath) + } + + sTxn, err := db.doTruncateData(ctx, underlyingTxn, newDB, params, newPath, map[string]interface{}{}) + if err != nil { + return wrapError(err) + } + + if sTxn != nil { + underlyingTxn = sTxn + } + } + for { var update *storage.Update diff --git a/storage/disk/txn.go b/storage/disk/txn.go index 68797e040a..95f962db5e 100644 --- a/storage/disk/txn.go +++ b/storage/disk/txn.go @@ -329,6 +329,12 @@ func (txn *transaction) partitionWriteMultiple(node *partitionTrie, path storage return nil, err } return txn.doPartitionWriteMultiple(node, path, bs, result) + case map[string]json.RawMessage: + bs, err := serialize(v) + if err != nil { + return nil, err + } + return txn.doPartitionWriteMultiple(node, path, bs, result) case json.RawMessage: return txn.doPartitionWriteMultiple(node, path, v, result) case []uint8: diff --git a/storage/inmem/inmem.go b/storage/inmem/inmem.go index 818bd17a96..770c30b6c8 100644 --- a/storage/inmem/inmem.go +++ b/storage/inmem/inmem.go @@ -97,10 +97,10 @@ func (db *store) NewTransaction(_ context.Context, params ...storage.Transaction } // Truncate implements the storage.Store interface. This method must be called within a transaction. -func (db *store) Truncate(ctx context.Context, txn storage.Transaction, _ storage.TransactionParams, it storage.Iterator) error { +func (db *store) Truncate(ctx context.Context, txn storage.Transaction, params storage.TransactionParams, it storage.Iterator) error { var update *storage.Update var err error - var mergedData map[string]interface{} + mergedData := map[string]interface{}{} underlying, err := db.underlying(txn) if err != nil { @@ -150,7 +150,14 @@ func (db *store) Truncate(ctx context.Context, txn storage.Transaction, _ storag return err } - // write merged data to store + if params.RootOverwrite { + newPath, ok := storage.ParsePathEscaped("/") + if !ok { + return fmt.Errorf("storage path invalid: %v", newPath) + } + return underlying.Write(storage.AddOp, newPath, mergedData) + } + for k := range mergedData { newPath, ok := storage.ParsePathEscaped("/" + k) if !ok { diff --git a/storage/interface.go b/storage/interface.go index 20981a78cb..b58d41ccff 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -52,6 +52,9 @@ type MakeDirer interface { // TransactionParams describes a new transaction. type TransactionParams struct { + // RootOverwrite indicates if this transaction will overwrite data at root. + RootOverwrite bool + // Write indicates if this transaction will perform any write operations. Write bool