Skip to content

Commit

Permalink
storage/disk: wildcard partition validation, docs caveat (open-policy…
Browse files Browse the repository at this point in the history
…-agent#4519)

A bunch of smaller follow-up tasks to open-policy-agent#4381.

* storage/disk_test: check invalid patches with wildcard partition, too
* docs/disk: add caveat re: bundles loaded into memory
* storage/disk: auto-manage /system partitions

If these are found in the user-provided partitions, we'll error out.

* storage/disk: pretty-print partitions with "*" instead of %2A
* storage/disk: respect wildcard-replacement in partition validation

It is now allowed to replace a partition like

    /foo/bar

by

    /foo/*

also if multiple wildcards are used.

Caveats:

You cannot add a wildcard partition like /*/*, since it would overlap
the managed "/system/*" partition.

When attempting to go back from /foo/* to /foo/bar, an error is
raised _unconditionally_ -- we could check the existing data, but
currently don't.

* storage/disk: check prefix when adding wildcard partitions

The previously done check would have falsely returned that there is no problem
when adding a wildcard partition: lookup of "/foo/*" with '*' not interpreted
as a wildcard, but as a string, would yield a not-found, even if there was any
data under /foo/.

Now, we'll check the prefix-until-wildcard. It's more cautious than
theoretically necessary, but safe.

Signed-off-by: Stephan Renatus <stephan.renatus@gmail.com>
  • Loading branch information
srenatus authored and rokkiter committed Apr 18, 2022
1 parent df978e6 commit 5a6c139
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 46 deletions.
16 changes: 16 additions & 0 deletions docs/content/misc-disk.md
Expand Up @@ -19,6 +19,17 @@ Backup and restore, or repair procedures for data corruption are not provided
at this time.
{{< /info >}}

While it's possible to load data larger than the allotted memory resources into OPA
using disk storage, there are limitations to be aware of:

{{< danger >}}
[Bundles](../management-bundles/) are loaded into memory **entirely** even when
disk storage is used: the decompressed, parsed bundle content is then inserted
into the disk store.

It is planned to fix this limitation in the future.
{{< /danger >}}

## Partitions

Partitions determine how the JSON data is split up when stored in the
Expand Down Expand Up @@ -77,6 +88,11 @@ are performance critical.
To figure out suboptimal partitioning, please have a look at the exposed
metrics.

OPA stores some internal values (such as bundle metadata) in the data store,
under `/system`. Partitions for that part of the data store are managed by
OPA, and providing any overlapping partitions in the config will raise an
error.

## Metrics

Using the [REST API](../rest-api/), you can include the `?metrics` query string
Expand Down
8 changes: 4 additions & 4 deletions plugins/bundle/plugin_test.go
Expand Up @@ -143,19 +143,19 @@ func TestPluginOneShotDiskStorageMetrics(t *testing.T) {

// NOTE(sr): These assertion reflect the current behaviour only! Not prescriptive.
name := "disk_deleted_keys"
if exp, act := 1, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
if exp, act := 3, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
t.Errorf("%s: expected %v, got %v", name, exp, act)
}
name = "disk_written_keys"
if exp, act := 7, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
if exp, act := 5, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
t.Errorf("%s: expected %v, got %v", name, exp, act)
}
name = "disk_read_keys"
if exp, act := 13, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
if exp, act := 10, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
t.Errorf("%s: expected %v, got %v", name, exp, act)
}
name = "disk_read_bytes"
if exp, act := 346, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
if exp, act := 171, met.Counter(name).Value(); act.(uint64) != uint64(exp) {
t.Errorf("%s: expected %v, got %v", name, exp, act)
}
for _, timer := range []string{
Expand Down
3 changes: 0 additions & 3 deletions server/server_test.go
Expand Up @@ -1032,7 +1032,6 @@ func TestCompileV1Observability(t *testing.T) {
"timer_rego_query_parse_ns",
"timer_server_handler_ns",
"counter_disk_read_keys",
"counter_disk_read_bytes",
"timer_disk_read_ns",
})
})
Expand Down Expand Up @@ -2460,7 +2459,6 @@ func TestDataMetricsEval(t *testing.T) {
testDataMetrics(t, f, "/data?metrics&partial", []string{
"counter_server_query_cache_hit",
"counter_disk_read_keys",
"counter_disk_read_bytes",
"timer_rego_input_parse_ns",
"timer_rego_query_eval_ns",
"timer_server_handler_ns",
Expand Down Expand Up @@ -3293,7 +3291,6 @@ func TestQueryV1(t *testing.T) {

assertMetricsExist(t, result.Metrics, []string{
"counter_disk_read_keys",
"counter_disk_read_bytes",
"timer_rego_query_compile_ns",
"timer_rego_query_eval_ns",
// "timer_server_handler_ns", // TODO(sr): we're not consistent about timing this?
Expand Down
39 changes: 35 additions & 4 deletions storage/disk/disk.go
Expand Up @@ -121,6 +121,10 @@ type metadata struct {
Partitions []storage.Path `json:"partitions"` // caller-supplied data layout
}

// systemPartition is the partition we add automatically: no user-defined partition
// should apply to the /system path.
const systemPartition = "/system/*"

// New returns a new disk-based store based on the provided options.
func New(ctx context.Context, logger logging.Logger, prom prometheus.Registerer, opts Options) (*Store, error) {

Expand All @@ -135,6 +139,14 @@ func New(ctx context.Context, logger logging.Logger, prom prometheus.Registerer,
}
}

partitions = append(partitions, storage.MustParsePath(systemPartition))
if !partitions.IsDisjoint() {
return nil, &storage.Error{
Code: storage.InternalErr,
Message: fmt.Sprintf("system partitions are managed: %v", opts.Partitions),
}
}

db, err := badger.Open(badgerConfigFromOptions(opts).WithLogger(&wrap{logger}))
if err != nil {
return nil, wrapError(err)
Expand Down Expand Up @@ -473,13 +485,32 @@ func (db *Store) validatePartitions(ctx context.Context, txn *badger.Txn, existi
removedPartitions := oldPathSet.Diff(newPathSet)
addedPartitions := newPathSet.Diff(oldPathSet)

if len(removedPartitions) > 0 {
// It's OK to replace partitions with wildcard partitions that overlap them:
// REMOVED: /foo/bar
// ADDED: /foo/*
// and the like.
replaced := make(pathSet, 0)
replacements := make(pathSet, 0)
for _, removed := range removedPartitions {
for _, added := range addedPartitions {
if isMatchedBy(removed, added) {
replaced = append(replaced, removed)
replacements = append(replacements, added)
}
}
}

rest := removedPartitions.Diff(replaced)
if len(rest) > 0 {
return &storage.Error{
Code: storage.InternalErr,
Message: fmt.Sprintf("partitions are backwards incompatible (old: %v, new: %v, missing: %v)", oldPathSet.Sorted(), newPathSet.Sorted(), removedPartitions.Sorted())}
Message: fmt.Sprintf("partitions are backwards incompatible (old: %v, new: %v, missing: %v)", oldPathSet, newPathSet, rest)}
}

for _, path := range addedPartitions {
for _, path := range addedPartitions.Diff(replacements) {
if prefix, wildcard := hasWildcard(path); wildcard {
path = prefix
}
for i := len(path); i > 0; i-- {
key, err := db.pm.DataPath2Key(path[:i])
if err != nil {
Expand Down Expand Up @@ -527,7 +558,7 @@ func (db *Store) diagnostics(ctx context.Context, partitions pathSet, logger log
if logger.GetLevel() < logging.Debug {
return nil
}
if len(partitions) == 0 {
if len(partitions) == 1 { // '/system/*' is always present
logger.Warn("no partitions configured")
if err := db.logPrefixStatistics(ctx, storage.MustParsePath("/"), logger); err != nil {
return err
Expand Down
167 changes: 132 additions & 35 deletions storage/disk/disk_test.go
Expand Up @@ -188,6 +188,7 @@ func TestDataPartitioningValidation(t *testing.T) {
t.Fatal("unexpected code or message, got:", err)
}

// set up two partitions
s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/foo/bar"),
storage.MustParsePath("/foo/baz"),
Expand All @@ -198,6 +199,7 @@ func TestDataPartitioningValidation(t *testing.T) {

closeFn(ctx, s)

// init with same settings: nothing wrong
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/foo/baz"),
storage.MustParsePath("/foo/bar"),
Expand All @@ -208,6 +210,7 @@ func TestDataPartitioningValidation(t *testing.T) {

closeFn(ctx, s)

// adding another partition
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/foo/baz"),
storage.MustParsePath("/foo/bar"),
Expand All @@ -217,6 +220,9 @@ func TestDataPartitioningValidation(t *testing.T) {
t.Fatal(err)
}

// We're writing data under the partitions: this affects how
// some partition changes are treated: if they don't affect existing
// data, they are accepted.
err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/foo/corge"), "x")
if err != nil {
t.Fatal(err)
Expand All @@ -234,7 +240,8 @@ func TestDataPartitioningValidation(t *testing.T) {
storage.MustParsePath("/foo/bar"),
storage.MustParsePath("/foo/qux/corge"),
}})
if err == nil || !strings.Contains(err.Error(), "partitions are backwards incompatible (old: [/foo/bar /foo/baz /foo/qux], new: [/foo/bar /foo/baz /foo/qux/corge], missing: [/foo/qux])") {
if err == nil || !strings.Contains(err.Error(),
"partitions are backwards incompatible (old: [/foo/bar /foo/baz /foo/qux /system/*], new: [/foo/bar /foo/baz /foo/qux/corge /system/*], missing: [/foo/qux])") {
t.Fatal(err)
}

Expand Down Expand Up @@ -275,9 +282,95 @@ func TestDataPartitioningValidation(t *testing.T) {
}

closeFn(ctx, s)

// switching to wildcard partition
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/foo/*"),
}})
if err != nil {
t.Fatal(err)
}
closeFn(ctx, s)

// adding another partition
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/fox/in/the/snow/*"),
storage.MustParsePath("/foo/*"),
}})
if err != nil {
t.Fatal(err)
}
closeFn(ctx, s)

// switching to a partition with multiple wildcards
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/fox/in/*/*/*"),
storage.MustParsePath("/foo/*"),
}})
if err != nil {
t.Fatal(err)
}
closeFn(ctx, s)

// there is no going back
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/fox/in/the/snow/*"),
storage.MustParsePath("/foo/*"),
}})
if err == nil || !strings.Contains(err.Error(),
"partitions are backwards incompatible (old: [/foo/* /fox/in/*/*/* /system/*], new: [/foo/* /fox/in/the/snow/* /system/*], missing: [/fox/in/*/*/*])",
) {
t.Fatal(err)
}
closeFn(ctx, s)

// adding a wildcard partition requires no content on the non-wildcard prefix
// we open the db with previously used partitions, write another key, and
// re-open with an extra wildcard partition
// switching to a partition with multiple wildcards
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/fox/in/*/*/*"),
storage.MustParsePath("/foo/*"),
}})
if err != nil {
t.Fatal(err)
}
err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/peanutbutter/jelly"), true)
if err != nil {
t.Fatal(err)
}
closeFn(ctx, s)
s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/fox/in/*/*/*"),
storage.MustParsePath("/peanutbutter/*"),
storage.MustParsePath("/foo/*"),
}})
if err == nil || !strings.Contains(err.Error(), "partitions are backwards incompatible (existing data: /peanutbutter)") {
t.Fatal("expected to find existing key but got:", err)
}
closeFn(ctx, s)
})
}

func TestDataPartitioningSystemPartitions(t *testing.T) {
ctx := context.Background()
dir := "unused"

for _, part := range []string{
"/system",
"/system/*",
"/system/a",
"/system/a/b",
} {
_, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath(part),
}})
if err == nil || !strings.Contains(err.Error(), "system partitions are managed") {
t.Fatal(err)
}
}
}

func TestDataPartitioningReadsAndWrites(t *testing.T) {

tests := []struct {
Expand Down Expand Up @@ -1155,43 +1248,47 @@ func TestDataPartitioningWriteNotFoundErrors(t *testing.T) {
}

func TestDataPartitioningWriteInvalidPatchError(t *testing.T) {
test.WithTempFS(map[string]string{}, func(dir string) {
ctx := context.Background()
s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/foo"),
}})
if err != nil {
t.Fatal(err)
}
defer s.Close(ctx)
for _, pt := range []string{"/*", "/foo"} {
t.Run(pt, func(t *testing.T) {
test.WithTempFS(map[string]string{}, func(dir string) {
ctx := context.Background()
s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{
storage.MustParsePath("/foo"),
}})
if err != nil {
t.Fatal(err)
}
defer s.Close(ctx)

err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/foo"), util.MustUnmarshalJSON([]byte(`[1,2,3]`)))
if err == nil {
t.Fatal("expected error")
} else if sErr, ok := err.(*storage.Error); !ok {
t.Fatal("expected storage error but got:", err)
} else if sErr.Code != storage.InvalidPatchErr {
t.Fatal("expected invalid patch error but got:", err)
}
err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/foo"), util.MustUnmarshalJSON([]byte(`[1,2,3]`)))
if err == nil {
t.Fatal("expected error")
} else if sErr, ok := err.(*storage.Error); !ok {
t.Fatal("expected storage error but got:", err)
} else if sErr.Code != storage.InvalidPatchErr {
t.Fatal("expected invalid patch error but got:", err)
}

err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/"), util.MustUnmarshalJSON([]byte(`{"foo": [1,2,3]}`)))
if err == nil {
t.Fatal("expected error")
} else if sErr, ok := err.(*storage.Error); !ok {
t.Fatal("expected storage error but got:", err)
} else if sErr.Code != storage.InvalidPatchErr {
t.Fatal("expected invalid patch error but got:", err)
}
err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/"), util.MustUnmarshalJSON([]byte(`{"foo": [1,2,3]}`)))
if err == nil {
t.Fatal("expected error")
} else if sErr, ok := err.(*storage.Error); !ok {
t.Fatal("expected storage error but got:", err)
} else if sErr.Code != storage.InvalidPatchErr {
t.Fatal("expected invalid patch error but got:", err)
}

err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/"), util.MustUnmarshalJSON([]byte(`[1,2,3]`)))
if err == nil {
t.Fatal("expected error")
} else if sErr, ok := err.(*storage.Error); !ok {
t.Fatal("expected storage error but got:", err)
} else if sErr.Code != storage.InvalidPatchErr {
t.Fatal("expected invalid patch error but got:", err)
}
})
err = storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/"), util.MustUnmarshalJSON([]byte(`[1,2,3]`)))
if err == nil {
t.Fatal("expected error")
} else if sErr, ok := err.(*storage.Error); !ok {
t.Fatal("expected storage error but got:", err)
} else if sErr.Code != storage.InvalidPatchErr {
t.Fatal("expected invalid patch error but got:", err)
}
})
})
}
}

func executeTestWrite(ctx context.Context, t *testing.T, s storage.Store, x testWrite) {
Expand Down

0 comments on commit 5a6c139

Please sign in to comment.