Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/disk: some follow ups #4519

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/content/misc-disk.md
Expand Up @@ -19,6 +19,17 @@ Backup and restore, or repair procedures for data corruption are not provided
at this time.
{{< /info >}}

While it's possible to load data larger than the allotted memory resources into OPA
using disk storage, there are limitations to be aware of:

{{< danger >}}
[Bundles](../management-bundles/) are loaded into memory **entirely** even when
disk storage is used: the decompressed, parsed bundle content is then inserted
into the disk store.

It is planned to fix this limitation in the future.
{{< /danger >}}

## Partitions

Partitions determine how the JSON data is split up when stored in the
Expand Down Expand Up @@ -77,6 +88,11 @@ are performance critical.
To figure out suboptimal partitioning, please have a look at the exposed
metrics.

OPA stores some internal values (such as bundle metadata) in the data store,
under `/system`. Partitions for that part of the data store are managed by
OPA, and providing any overlapping partitions in the config will raise an
error.

## Metrics

Using the [REST API](../rest-api/), you can include the `?metrics` query string
Expand Down
8 changes: 4 additions & 4 deletions plugins/bundle/plugin_test.go
Expand Up @@ -143,19 +143,19 @@ func TestPluginOneShotDiskStorageMetrics(t *testing.T) {

// NOTE(sr): These assertion reflect the current behaviour only! Not prescriptive.
name := "disk_deleted_keys"
if exp, act := 1, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
if exp, act := 3, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
t.Errorf("%s: expected %v, got %v", name, exp, act)
}
name = "disk_written_keys"
if exp, act := 7, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
if exp, act := 5, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
t.Errorf("%s: expected %v, got %v", name, exp, act)
}
name = "disk_read_keys"
if exp, act := 13, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
if exp, act := 10, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
t.Errorf("%s: expected %v, got %v", name, exp, act)
}
name = "disk_read_bytes"
if exp, act := 346, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
if exp, act := 171, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
t.Errorf("%s: expected %v, got %v", name, exp, act)
}
for _, timer := range []string{
Expand Down
3 changes: 0 additions & 3 deletions server/server_test.go
Expand Up @@ -1032,7 +1032,6 @@ func TestCompileV1Observability(t *testing.T) {
"timer_rego_query_parse_ns",
"timer_server_handler_ns",
"counter_disk_read_keys",
"counter_disk_read_bytes",
"timer_disk_read_ns",
})
})
Expand Down Expand Up @@ -2460,7 +2459,6 @@ func TestDataMetricsEval(t *testing.T) {
testDataMetrics(t, f, "/data?metrics&partial", []string{
"counter_server_query_cache_hit",
"counter_disk_read_keys",
"counter_disk_read_bytes",
"timer_rego_input_parse_ns",
"timer_rego_query_eval_ns",
"timer_server_handler_ns",
Expand Down Expand Up @@ -3293,7 +3291,6 @@ func TestQueryV1(t *testing.T) {

assertMetricsExist(t, result.Metrics, []string{
"counter_disk_read_keys",
"counter_disk_read_bytes",
"timer_rego_query_compile_ns",
"timer_rego_query_eval_ns",
// "timer_server_handler_ns", // TODO(sr): we're not consistent about timing this?
Expand Down
39 changes: 35 additions & 4 deletions storage/disk/disk.go
Expand Up @@ -121,6 +121,10 @@ type metadata struct {
Partitions []storage.Path `json:"partitions"` // caller-supplied data layout
}

// systemPartition is the partition we add automatically: no user-defined partition
// should apply to the /system path.
const systemPartition = "/system/*"

// New returns a new disk-based store based on the provided options.
func New(ctx context.Context, logger logging.Logger, prom prometheus.Registerer, opts Options) (*Store, error) {

Expand All @@ -135,6 +139,14 @@ func New(ctx context.Context, logger logging.Logger, prom prometheus.Registerer,
}
}

partitions = append(partitions, storage.MustParsePath(systemPartition))
if !partitions.IsDisjoint() {
return nil, &storage.Error{
Code: storage.InternalErr,
Message: fmt.Sprintf("system partitions are managed: %v", opts.Partitions),
}
}

db, err := badger.Open(badgerConfigFromOptions(opts).WithLogger(&wrap{logger}))
if err != nil {
return nil, wrapError(err)
Expand Down Expand Up @@ -473,13 +485,32 @@ func (db *Store) validatePartitions(ctx context.Context, txn *badger.Txn, existi
removedPartitions := oldPathSet.Diff(newPathSet)
addedPartitions := newPathSet.Diff(oldPathSet)

if len(removedPartitions) > 0 {
// It's OK to replace partitions with wildcard partitions that overlap them:
// REMOVED: /foo/bar
// ADDED: /foo/*
// and the like.
replaced := make(pathSet, 0)
replacements := make(pathSet, 0)
for _, removed := range removedPartitions {
for _, added := range addedPartitions {
if isMatchedBy(removed, added) {
replaced = append(replaced, removed)
replacements = append(replacements, added)
}
}
}

rest := removedPartitions.Diff(replaced)
if len(rest) > 0 {
return &storage.Error{
Code: storage.InternalErr,
Message: fmt.Sprintf("partitions are backwards incompatible (old: %v, new: %v, missing: %v)", oldPathSet.Sorted(), newPathSet.Sorted(), removedPartitions.Sorted())}
Message: fmt.Sprintf("partitions are backwards incompatible (old: %v, new: %v, missing: %v)", oldPathSet, newPathSet, rest)}
}

for _, path := range addedPartitions {
for _, path := range addedPartitions.Diff(replacements) {
if prefix, wildcard := hasWildcard(path); wildcard {
path = prefix
}
for i := len(path); i > 0; i-- {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔍 I'll double check tomorrow if this logic (not the change here, but what's in main) needs adjustments for patterned partitions. I think we are open to ignoring existing data that we should care about? Because the len(path) logic followed by a lookup will not take * into account at all.

The impact is when adding wildcard partitions to a database with existing data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've addressed this in c8f9305. I'll add a test case tomorrow.

key, err := db.pm.DataPath2Key(path[:i])
if err != nil {
Expand Down Expand Up @@ -527,7 +558,7 @@ func (db *Store) diagnostics(ctx context.Context, partitions pathSet, logger log
if logger.GetLevel() < logging.Debug {
return nil
}
if len(partitions) == 0 {
if len(partitions) == 1 { // '/system/*' is always present
logger.Warn("no partitions configured")
if err := db.logPrefixStatistics(ctx, storage.MustParsePath("/"), logger); err != nil {
return err
Expand Down
167 changes: 132 additions & 35 deletions storage/disk/disk_test.go
Expand Up @@ -188,6 +188,7 @@ func TestDataPartitioningValidation(t *testing.T) {
t.Fatal("unexpected code or message, got:", err)
}

// set up two partitions
s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/foo/bar"),
storage.MustParsePath("/foo/baz"),
Expand All @@ -198,6 +199,7 @@ func TestDataPartitioningValidation(t *testing.T) {

closeFn(ctx, s)

// init with same settings: nothing wrong
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/foo/baz"),
storage.MustParsePath("/foo/bar"),
Expand All @@ -208,6 +210,7 @@ func TestDataPartitioningValidation(t *testing.T) {

closeFn(ctx, s)

// adding another partition
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/foo/baz"),
storage.MustParsePath("/foo/bar"),
Expand All @@ -217,6 +220,9 @@ func TestDataPartitioningValidation(t *testing.T) {
t.Fatal(err)
}

// We're writing data under the partitions: this affects how
// some partition changes are treated: if they don't affect existing
// data, they are accepted.
err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/foo/corge"), "x")
if err != nil {
t.Fatal(err)
Expand All @@ -234,7 +240,8 @@ func TestDataPartitioningValidation(t *testing.T) {
storage.MustParsePath("/foo/bar"),
storage.MustParsePath("/foo/qux/corge"),
}})
if err == nil || !strings.Contains(err.Error(), "partitions are backwards incompatible (old: [/foo/bar /foo/baz /foo/qux], new: [/foo/bar /foo/baz /foo/qux/corge], missing: [/foo/qux])") {
if err == nil || !strings.Contains(err.Error(),
"partitions are backwards incompatible (old: [/foo/bar /foo/baz /foo/qux /system/*], new: [/foo/bar /foo/baz /foo/qux/corge /system/*], missing: [/foo/qux])") {
t.Fatal(err)
}

Expand Down Expand Up @@ -275,9 +282,95 @@ func TestDataPartitioningValidation(t *testing.T) {
}

closeFn(ctx, s)

// switching to wildcard partition
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/foo/*"),
}})
if err != nil {
t.Fatal(err)
}
closeFn(ctx, s)

// adding another partition
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/fox/in/the/snow/*"),
storage.MustParsePath("/foo/*"),
}})
if err != nil {
t.Fatal(err)
}
closeFn(ctx, s)

// switching to a partition with multiple wildcards
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/fox/in/*/*/*"),
storage.MustParsePath("/foo/*"),
}})
if err != nil {
t.Fatal(err)
}
closeFn(ctx, s)

// there is no going back
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/fox/in/the/snow/*"),
storage.MustParsePath("/foo/*"),
}})
if err == nil || !strings.Contains(err.Error(),
"partitions are backwards incompatible (old: [/foo/* /fox/in/*/*/* /system/*], new: [/foo/* /fox/in/the/snow/* /system/*], missing: [/fox/in/*/*/*])",
) {
t.Fatal(err)
}
closeFn(ctx, s)

// adding a wildcard partition requires no content on the non-wildcard prefix
// we open the db with previously used partitions, write another key, and
// re-open with an extra wildcard partition
// switching to a partition with multiple wildcards
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/fox/in/*/*/*"),
storage.MustParsePath("/foo/*"),
}})
if err != nil {
t.Fatal(err)
}
err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/peanutbutter/jelly"), true)
if err != nil {
t.Fatal(err)
}
closeFn(ctx, s)
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/fox/in/*/*/*"),
storage.MustParsePath("/peanutbutter/*"),
storage.MustParsePath("/foo/*"),
}})
if err == nil || !strings.Contains(err.Error(), "partitions are backwards incompatible (existing data: /peanutbutter)") {
t.Fatal("expected to find existing key but got:", err)
}
closeFn(ctx, s)
})
}

func TestDataPartitioningSystemPartitions(t *testing.T) {
ctx := context.Background()
dir := "unused"

for _, part := range []string{
"/system",
"/system/*",
"/system/a",
"/system/a/b",
} {
_, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath(part),
}})
if err == nil || !strings.Contains(err.Error(), "system partitions are managed") {
t.Fatal(err)
}
}
}

func TestDataPartitioningReadsAndWrites(t *testing.T) {

tests := []struct {
Expand Down Expand Up @@ -1155,43 +1248,47 @@ func TestDataPartitioningWriteNotFoundErrors(t *testing.T) {
}

func TestDataPartitioningWriteInvalidPatchError(t *testing.T) {
test.WithTempFS(map[string]string{}, func(dir string) {
ctx := context.Background()
s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/foo"),
}})
if err != nil {
t.Fatal(err)
}
defer s.Close(ctx)
for _, pt := range []string{"/*", "/foo"} {
t.Run(pt, func(t *testing.T) {
test.WithTempFS(map[string]string{}, func(dir string) {
ctx := context.Background()
s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/foo"),
}})
if err != nil {
t.Fatal(err)
}
defer s.Close(ctx)

err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/foo"), util.MustUnmarshalJSON([]byte(`[1,2,3]`)))
if err == nil {
t.Fatal("expected error")
} else if sErr, ok := err.(*storage.Error); !ok {
t.Fatal("expected storage error but got:", err)
} else if sErr.Code != storage.InvalidPatchErr {
t.Fatal("expected invalid patch error but got:", err)
}
err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/foo"), util.MustUnmarshalJSON([]byte(`[1,2,3]`)))
if err == nil {
t.Fatal("expected error")
} else if sErr, ok := err.(*storage.Error); !ok {
t.Fatal("expected storage error but got:", err)
} else if sErr.Code != storage.InvalidPatchErr {
t.Fatal("expected invalid patch error but got:", err)
}

err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/"), util.MustUnmarshalJSON([]byte(`{"foo": [1,2,3]}`)))
if err == nil {
t.Fatal("expected error")
} else if sErr, ok := err.(*storage.Error); !ok {
t.Fatal("expected storage error but got:", err)
} else if sErr.Code != storage.InvalidPatchErr {
t.Fatal("expected invalid patch error but got:", err)
}
err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/"), util.MustUnmarshalJSON([]byte(`{"foo": [1,2,3]}`)))
if err == nil {
t.Fatal("expected error")
} else if sErr, ok := err.(*storage.Error); !ok {
t.Fatal("expected storage error but got:", err)
} else if sErr.Code != storage.InvalidPatchErr {
t.Fatal("expected invalid patch error but got:", err)
}

err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/"), util.MustUnmarshalJSON([]byte(`[1,2,3]`)))
if err == nil {
t.Fatal("expected error")
} else if sErr, ok := err.(*storage.Error); !ok {
t.Fatal("expected storage error but got:", err)
} else if sErr.Code != storage.InvalidPatchErr {
t.Fatal("expected invalid patch error but got:", err)
}
})
err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/"), util.MustUnmarshalJSON([]byte(`[1,2,3]`)))
if err == nil {
t.Fatal("expected error")
} else if sErr, ok := err.(*storage.Error); !ok {
t.Fatal("expected storage error but got:", err)
} else if sErr.Code != storage.InvalidPatchErr {
t.Fatal("expected invalid patch error but got:", err)
}
})
})
}
}

func executeTestWrite(ctx context.Context, t *testing.T, s storage.Store, x testWrite) {
Expand Down