diff --git a/docs/content/misc-disk.md b/docs/content/misc-disk.md index fb0208ef08..fbc0ba7dcd 100644 --- a/docs/content/misc-disk.md +++ b/docs/content/misc-disk.md @@ -19,6 +19,17 @@ Backup and restore, or repair procedures for data corruption are not provided at this time. {{< /info >}} +While it's possible to load data larger than the allotted memory resources into OPA +using disk storage, there are limitations to be aware of: + +{{< danger >}} +[Bundles](../management-bundles/) are loaded into memory **entirely** even when +disk storage is used: the decompressed, parsed bundle content is then inserted +into the disk store. + +It is planned to fix this limitation in the future. +{{< /danger >}} + ## Partitions Partitions determine how the JSON data is split up when stored in the @@ -77,6 +88,11 @@ are performance critical. To figure out suboptimal partitioning, please have a look at the exposed metrics. +OPA stores some internal values (such as bundle metadata) in the data store, +under `/system`. Partitions for that part of the data store are managed by +OPA, and providing any overlapping partitions in the config will raise an +error. + ## Metrics Using the [REST API](../rest-api/), you can include the `?metrics` query string diff --git a/plugins/bundle/plugin_test.go b/plugins/bundle/plugin_test.go index 4cce413326..ac55f7693d 100644 --- a/plugins/bundle/plugin_test.go +++ b/plugins/bundle/plugin_test.go @@ -143,19 +143,19 @@ func TestPluginOneShotDiskStorageMetrics(t *testing.T) { // NOTE(sr): These assertion reflect the current behaviour only! Not prescriptive. name := "disk_deleted_keys" - if exp, act := 1, met.Counter(name).Value(); act.(uint64) != uint64(exp) { + if exp, act := 3, met.Counter(name).Value(); act.(uint64) != uint64(exp) { t.Errorf("%s: expected %v, got %v", name, exp, act) } name = "disk_written_keys" - if exp, act := 7, met.Counter(name).Value(); act.(uint64) != uint64(exp) { + if exp, act := 5, met.Counter(name).Value(); act.(uint64) != uint64(exp) { t.Errorf("%s: expected %v, got %v", name, exp, act) } name = "disk_read_keys" - if exp, act := 13, met.Counter(name).Value(); act.(uint64) != uint64(exp) { + if exp, act := 10, met.Counter(name).Value(); act.(uint64) != uint64(exp) { t.Errorf("%s: expected %v, got %v", name, exp, act) } name = "disk_read_bytes" - if exp, act := 346, met.Counter(name).Value(); act.(uint64) != uint64(exp) { + if exp, act := 171, met.Counter(name).Value(); act.(uint64) != uint64(exp) { t.Errorf("%s: expected %v, got %v", name, exp, act) } for _, timer := range []string{ diff --git a/server/server_test.go b/server/server_test.go index d5cd2551b7..df0558b7d2 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1032,7 +1032,6 @@ func TestCompileV1Observability(t *testing.T) { "timer_rego_query_parse_ns", "timer_server_handler_ns", "counter_disk_read_keys", - "counter_disk_read_bytes", "timer_disk_read_ns", }) }) @@ -2460,7 +2459,6 @@ func TestDataMetricsEval(t *testing.T) { testDataMetrics(t, f, "/data?metrics&partial", []string{ "counter_server_query_cache_hit", "counter_disk_read_keys", - "counter_disk_read_bytes", "timer_rego_input_parse_ns", "timer_rego_query_eval_ns", "timer_server_handler_ns", @@ -3293,7 +3291,6 @@ func TestQueryV1(t *testing.T) { assertMetricsExist(t, result.Metrics, []string{ "counter_disk_read_keys", - "counter_disk_read_bytes", "timer_rego_query_compile_ns", "timer_rego_query_eval_ns", // "timer_server_handler_ns", // TODO(sr): we're not consistent about timing this? diff --git a/storage/disk/disk.go b/storage/disk/disk.go index 9e48c36bc2..2558b56718 100644 --- a/storage/disk/disk.go +++ b/storage/disk/disk.go @@ -121,6 +121,10 @@ type metadata struct { Partitions []storage.Path `json:"partitions"` // caller-supplied data layout } +// systemPartition is the partition we add automatically: no user-defined partition +// should apply to the /system path. +const systemPartition = "/system/*" + // New returns a new disk-based store based on the provided options. func New(ctx context.Context, logger logging.Logger, prom prometheus.Registerer, opts Options) (*Store, error) { @@ -135,6 +139,14 @@ func New(ctx context.Context, logger logging.Logger, prom prometheus.Registerer, } } + partitions = append(partitions, storage.MustParsePath(systemPartition)) + if !partitions.IsDisjoint() { + return nil, &storage.Error{ + Code: storage.InternalErr, + Message: fmt.Sprintf("system partitions are managed: %v", opts.Partitions), + } + } + db, err := badger.Open(badgerConfigFromOptions(opts).WithLogger(&wrap{logger})) if err != nil { return nil, wrapError(err) @@ -473,13 +485,32 @@ func (db *Store) validatePartitions(ctx context.Context, txn *badger.Txn, existi removedPartitions := oldPathSet.Diff(newPathSet) addedPartitions := newPathSet.Diff(oldPathSet) - if len(removedPartitions) > 0 { + // It's OK to replace partitions with wildcard partitions that overlap them: + // REMOVED: /foo/bar + // ADDED: /foo/* + // and the like. + replaced := make(pathSet, 0) + replacements := make(pathSet, 0) + for _, removed := range removedPartitions { + for _, added := range addedPartitions { + if isMatchedBy(removed, added) { + replaced = append(replaced, removed) + replacements = append(replacements, added) + } + } + } + + rest := removedPartitions.Diff(replaced) + if len(rest) > 0 { return &storage.Error{ Code: storage.InternalErr, - Message: fmt.Sprintf("partitions are backwards incompatible (old: %v, new: %v, missing: %v)", oldPathSet.Sorted(), newPathSet.Sorted(), removedPartitions.Sorted())} + Message: fmt.Sprintf("partitions are backwards incompatible (old: %v, new: %v, missing: %v)", oldPathSet, newPathSet, rest)} } - for _, path := range addedPartitions { + for _, path := range addedPartitions.Diff(replacements) { + if prefix, wildcard := hasWildcard(path); wildcard { + path = prefix + } for i := len(path); i > 0; i-- { key, err := db.pm.DataPath2Key(path[:i]) if err != nil { @@ -527,7 +558,7 @@ func (db *Store) diagnostics(ctx context.Context, partitions pathSet, logger log if logger.GetLevel() < logging.Debug { return nil } - if len(partitions) == 0 { + if len(partitions) == 1 { // '/system/*' is always present logger.Warn("no partitions configured") if err := db.logPrefixStatistics(ctx, storage.MustParsePath("/"), logger); err != nil { return err diff --git a/storage/disk/disk_test.go b/storage/disk/disk_test.go index 290878568a..29ce0df7ff 100644 --- a/storage/disk/disk_test.go +++ b/storage/disk/disk_test.go @@ -188,6 +188,7 @@ func TestDataPartitioningValidation(t *testing.T) { t.Fatal("unexpected code or message, got:", err) } + // set up two partitions s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/baz"), @@ -198,6 +199,7 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn(ctx, s) + // init with same settings: nothing wrong s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/baz"), storage.MustParsePath("/foo/bar"), @@ -208,6 +210,7 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn(ctx, s) + // adding another partition s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/baz"), storage.MustParsePath("/foo/bar"), @@ -217,6 +220,9 @@ func TestDataPartitioningValidation(t *testing.T) { t.Fatal(err) } + // We're writing data under the partitions: this affects how + // some partition changes are treated: if they don't affect existing + // data, they are accepted. err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/foo/corge"), "x") if err != nil { t.Fatal(err) @@ -234,7 +240,8 @@ func TestDataPartitioningValidation(t *testing.T) { storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/qux/corge"), }}) - if err == nil || !strings.Contains(err.Error(), "partitions are backwards incompatible (old: [/foo/bar /foo/baz /foo/qux], new: [/foo/bar /foo/baz /foo/qux/corge], missing: [/foo/qux])") { + if err == nil || !strings.Contains(err.Error(), + "partitions are backwards incompatible (old: [/foo/bar /foo/baz /foo/qux /system/*], new: [/foo/bar /foo/baz /foo/qux/corge /system/*], missing: [/foo/qux])") { t.Fatal(err) } @@ -275,9 +282,95 @@ func TestDataPartitioningValidation(t *testing.T) { } closeFn(ctx, s) + + // switching to wildcard partition + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ + storage.MustParsePath("/foo/*"), + }}) + if err != nil { + t.Fatal(err) + } + closeFn(ctx, s) + + // adding another partition + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ + storage.MustParsePath("/fox/in/the/snow/*"), + storage.MustParsePath("/foo/*"), + }}) + if err != nil { + t.Fatal(err) + } + closeFn(ctx, s) + + // switching to a partition with multiple wildcards + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ + storage.MustParsePath("/fox/in/*/*/*"), + storage.MustParsePath("/foo/*"), + }}) + if err != nil { + t.Fatal(err) + } + closeFn(ctx, s) + + // there is no going back + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ + storage.MustParsePath("/fox/in/the/snow/*"), + storage.MustParsePath("/foo/*"), + }}) + if err == nil || !strings.Contains(err.Error(), + "partitions are backwards incompatible (old: [/foo/* /fox/in/*/*/* /system/*], new: [/foo/* /fox/in/the/snow/* /system/*], missing: [/fox/in/*/*/*])", + ) { + t.Fatal(err) + } + closeFn(ctx, s) + + // adding a wildcard partition requires no content on the non-wildcard prefix + // we open the db with previously used partitions, write another key, and + // re-open with an extra wildcard partition + // switching to a partition with multiple wildcards + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ + storage.MustParsePath("/fox/in/*/*/*"), + storage.MustParsePath("/foo/*"), + }}) + if err != nil { + t.Fatal(err) + } + err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/peanutbutter/jelly"), true) + if err != nil { + t.Fatal(err) + } + closeFn(ctx, s) + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ + storage.MustParsePath("/fox/in/*/*/*"), + storage.MustParsePath("/peanutbutter/*"), + storage.MustParsePath("/foo/*"), + }}) + if err == nil || !strings.Contains(err.Error(), "partitions are backwards incompatible (existing data: /peanutbutter)") { + t.Fatal("expected to find existing key but got:", err) + } + closeFn(ctx, s) }) } +func TestDataPartitioningSystemPartitions(t *testing.T) { + ctx := context.Background() + dir := "unused" + + for _, part := range []string{ + "/system", + "/system/*", + "/system/a", + "/system/a/b", + } { + _, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ + storage.MustParsePath(part), + }}) + if err == nil || !strings.Contains(err.Error(), "system partitions are managed") { + t.Fatal(err) + } + } +} + func TestDataPartitioningReadsAndWrites(t *testing.T) { tests := []struct { @@ -1155,43 +1248,47 @@ func TestDataPartitioningWriteNotFoundErrors(t *testing.T) { } func TestDataPartitioningWriteInvalidPatchError(t *testing.T) { - test.WithTempFS(map[string]string{}, func(dir string) { - ctx := context.Background() - s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ - storage.MustParsePath("/foo"), - }}) - if err != nil { - t.Fatal(err) - } - defer s.Close(ctx) + for _, pt := range []string{"/*", "/foo"} { + t.Run(pt, func(t *testing.T) { + test.WithTempFS(map[string]string{}, func(dir string) { + ctx := context.Background() + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ + storage.MustParsePath("/foo"), + }}) + if err != nil { + t.Fatal(err) + } + defer s.Close(ctx) - err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/foo"), util.MustUnmarshalJSON([]byte(`[1,2,3]`))) - if err == nil { - t.Fatal("expected error") - } else if sErr, ok := err.(*storage.Error); !ok { - t.Fatal("expected storage error but got:", err) - } else if sErr.Code != storage.InvalidPatchErr { - t.Fatal("expected invalid patch error but got:", err) - } + err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/foo"), util.MustUnmarshalJSON([]byte(`[1,2,3]`))) + if err == nil { + t.Fatal("expected error") + } else if sErr, ok := err.(*storage.Error); !ok { + t.Fatal("expected storage error but got:", err) + } else if sErr.Code != storage.InvalidPatchErr { + t.Fatal("expected invalid patch error but got:", err) + } - err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/"), util.MustUnmarshalJSON([]byte(`{"foo": [1,2,3]}`))) - if err == nil { - t.Fatal("expected error") - } else if sErr, ok := err.(*storage.Error); !ok { - t.Fatal("expected storage error but got:", err) - } else if sErr.Code != storage.InvalidPatchErr { - t.Fatal("expected invalid patch error but got:", err) - } + err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/"), util.MustUnmarshalJSON([]byte(`{"foo": [1,2,3]}`))) + if err == nil { + t.Fatal("expected error") + } else if sErr, ok := err.(*storage.Error); !ok { + t.Fatal("expected storage error but got:", err) + } else if sErr.Code != storage.InvalidPatchErr { + t.Fatal("expected invalid patch error but got:", err) + } - err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/"), util.MustUnmarshalJSON([]byte(`[1,2,3]`))) - if err == nil { - t.Fatal("expected error") - } else if sErr, ok := err.(*storage.Error); !ok { - t.Fatal("expected storage error but got:", err) - } else if sErr.Code != storage.InvalidPatchErr { - t.Fatal("expected invalid patch error but got:", err) - } - }) + err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/"), util.MustUnmarshalJSON([]byte(`[1,2,3]`))) + if err == nil { + t.Fatal("expected error") + } else if sErr, ok := err.(*storage.Error); !ok { + t.Fatal("expected storage error but got:", err) + } else if sErr.Code != storage.InvalidPatchErr { + t.Fatal("expected invalid patch error but got:", err) + } + }) + }) + } } func executeTestWrite(ctx context.Context, t *testing.T, s storage.Store, x testWrite) { diff --git a/storage/disk/paths.go b/storage/disk/paths.go index 04864087b5..f231608c93 100644 --- a/storage/disk/paths.go +++ b/storage/disk/paths.go @@ -7,6 +7,7 @@ package disk import ( "fmt" "sort" + "strings" "github.com/open-policy-agent/opa/storage" ) @@ -64,6 +65,22 @@ func (pm *pathMapper) DataPath2Key(path storage.Path) ([]byte, error) { type pathSet []storage.Path +func (ps pathSet) String() string { + if len(ps) == 0 { + return "[]" + } + buf := strings.Builder{} + buf.WriteRune('[') + for j, p := range ps.Sorted() { + if j != 0 { + buf.WriteRune(' ') + } + buf.WriteString(toString(p)) + } + buf.WriteRune(']') + return buf.String() +} + func (ps pathSet) IsDisjoint() bool { for i := range ps { for j := range ps { @@ -94,6 +111,23 @@ func hasPrefixWithWildcard(p, other storage.Path) bool { return true } +// isMatchedBy returns true if p starts with other, or is matched by it +// respecting wildcards _in other_ -- not in p. +func isMatchedBy(p, other storage.Path) bool { + if len(other) != len(p) { + return false + } + for i := range other { + if other[i] == pathWildcard { + continue + } + if p[i] != other[i] { + return false + } + } + return true +} + func (ps pathSet) Diff(other pathSet) pathSet { diff := pathSet{} for _, x := range ps {