From 516dd47dd1d233359e0b0311158153dbf98136b8 Mon Sep 17 00:00:00 2001 From: Stephan Renatus Date: Wed, 23 Feb 2022 11:38:02 +0100 Subject: [PATCH] runtime+storage: integrate disk storage With this change, the disk backend (badger) becomes available for use with the OPA runtime properly: It can be configured using the `storage.disk` key in OPA's config (see included documentation). When enabled, - any data or policies stored with OPA will persist over restarts - per-query metrics related to disk usage are reported - Prometheus metrics per storage operation are exported The main intention behind this feature is to optimize memory usage: OPA can now operate on more data than fits into the allotted memory resources. It is NOT meant to be used as a primary source of truth: there are no backup/restore or desaster recovery procedures -- you MUST secure the means to restore the data stored with OPA's disk storage by yourself. See also #4014. Future improvements around bundle loading are planned. Some notes on details: storage/disk: impose same locking regime used with inmem With this setup, we'll ensure: - there is only one open write txn at a time - there are any number of open read txns at a time - writes are blocked when reads are inflight - during a commit (and triggers being run), no read txns can be created This is to ensure the same atomic policy update semantics when using 'disk" as we have with "inmem". We're basically opting out of badger's currency control and transactionality guarantees. This is because we cannot piggy back on that to ensure the atomic update we want. There might be other ways -- using subscribers, and blocking in some other place -- but this one seems preferrable since it mirrors inmem. Part of the problem is ErrTxnTooLarge, and committing and renewing txns when it occurs: that, which is the prescribed solution to txns growing too big, also means that reads can see half of the "logical" transaction having been committed, while the rest is still getting processed. Another approach would have been using `WriteBatch`, but that won't let us read from the batch, only apply Set and Delete operations. We currently need to read (via an iterator) to figure out if we need to delete keys to replace something in the store. There is no DropPrefix operation on the badger txn, or the WriteBatch API. storage/disk: remove commit-and-renew-txn code for txn-too-big errors This would break transactional guarantees we care about: while there can be only one write transaction at a time, read transactions may happen while a write txn is underway -- with this commit-and-reset logic, those would read partial data. Now, the error will be returned to the caller. The maximum txn size depends on the size of memtables, and could be tweaked manually. In general, the caller should try to push multiple smaller increments of the data. storage/disk: implement noop MakeDir The MakeDir operation as implemented in the backend-agnostic storage code has become an issue with the disk store: to write /foo/bar/baz, we'd have to read /foo (among other subdirs), and that can be _much_ work for the disk backend. With inmem, it's cheap, so this wasn't problematic before. Some of the storage/disk/txn.go logic had to be adjusted to properly do the MakeDir steps implicitly. The index argument addition to patch() in storage/disk/txn.go was necessary to keep the error messages conforming to the previous code path: previously, conflicts (arrays indexed as objects) would be surfaced in the MakeDir step, now it's entangled with the patch calculation. storage/disk: check ctx.Err() in List/Get operations This won't abort reading a single key, but it will abort iterations. storage/disk: support patterns in partitions There is a potential clash here: "*", the path wildcard, is a valid path section. However, it only affects the case when a user would want to have a partition at /foo/*/bar and would really mean "*", and not the wildcard. Storing data at /foo/*/bar with a literal "*" won't be treated differently than storing something at /fo/xyz/bar. storage/disk: keep per-txn-type histograms of stats This is done by reading off the metrics on commit, and shovelling their numbers into the prometheus collector. NOTE: if you were to share a metrics object among multiple transactions, the results would be skewed, as it's not reset. However, our server handlers don't do that. storage/disk: opt out of badger's conflict detection With only one write transaction in flight at any time, the situation that badger guards against cannot happen: A transaction has written to a key after the current, to-be-committed transaction has last read that key from the store. Since it can't happen, we can ignore the bookkeeping involved. This improves the time it takes to overwrite existing keys. Signed-off-by: Stephan Renatus --- config/config.go | 3 + docs/content/configuration.md | 17 + docs/content/misc-disk.md | 177 ++++++ docs/content/rest-api.md | 8 + internal/config/config.go | 6 +- plugins/bundle/plugin.go | 4 +- plugins/bundle/plugin_test.go | 119 +++- plugins/plugins.go | 5 + rego/example_test.go | 5 +- runtime/runtime.go | 32 +- server/server.go | 92 ++- server/server_test.go | 558 ++++++++++------- storage/disk/config.go | 75 +++ storage/disk/config_test.go | 238 ++++++++ storage/disk/disk.go | 265 +++++++- storage/disk/disk_test.go | 572 +++++++++++++++--- storage/disk/example_test.go | 5 +- storage/disk/metrics.go | 51 ++ storage/disk/partition.go | 7 +- storage/disk/partition_test.go | 22 +- storage/disk/paths.go | 23 +- storage/disk/paths_test.go | 59 ++ storage/disk/txn.go | 293 +++++---- storage/disk/txn_test.go | 109 ++-- storage/errors.go | 2 +- storage/inmem/inmem_test.go | 2 +- storage/inmem/txn.go | 7 +- storage/interface.go | 23 + storage/internal/errors/errors.go | 7 + storage/internal/ptr/ptr.go | 28 +- test/authz/authz_bench_test.go | 44 +- .../e2e/authz/authz_bench_integration_test.go | 8 +- test/e2e/authz/disk.go | 25 + test/e2e/authz/nodisk.go | 15 + test/e2e/concurrency/concurrency_test.go | 25 +- test/e2e/shutdown/shutdown_test.go | 2 +- .../authz/authz_bench_integration_test.go | 9 +- test/e2e/wasm/authz/disk.go | 25 + test/e2e/wasm/authz/nodisk.go | 15 + 39 files changed, 2476 insertions(+), 506 deletions(-) create mode 100644 docs/content/misc-disk.md create mode 100644 storage/disk/config.go create mode 100644 storage/disk/config_test.go create mode 100644 storage/disk/metrics.go create mode 100644 storage/disk/paths_test.go create mode 100644 test/e2e/authz/disk.go create mode 100644 test/e2e/authz/nodisk.go create mode 100644 test/e2e/wasm/authz/disk.go create mode 100644 test/e2e/wasm/authz/nodisk.go diff --git a/config/config.go b/config/config.go index b38a432207..837b075a38 100644 --- a/config/config.go +++ b/config/config.go @@ -34,6 +34,9 @@ type Config struct { Caching json.RawMessage `json:"caching,omitempty"` PersistenceDirectory *string `json:"persistence_directory,omitempty"` DistributedTracing json.RawMessage `json:"distributed_tracing,omitempty"` + Storage *struct { + Disk json.RawMessage `json:"disk,omitempty"` + } `json:"storage,omitempty"` } // ParseConfig returns a valid Config object with defaults injected. The id diff --git a/docs/content/configuration.md b/docs/content/configuration.md index 4366e48f03..1a20fad004 100644 --- a/docs/content/configuration.md +++ b/docs/content/configuration.md @@ -837,3 +837,20 @@ The following encryption methods are supported: | `off` | Disable TLS | | `tls` | Enable TLS | | `mtls` | Enable mutual TLS | + +### Disk Storage + +The `storage` configuration key allows for enabling, and configuring, the +persistent on-disk storage of an OPA instance. + +If `disk` is set to something, the server will enable the on-disk store +with data put into the configured `directory`. + +| Field | Type | Required | Description | +| --- | --- | --- | --- | +| `storage.disk.directory` | `string` | Yes | This is the directory to use for storing the persistent database. | +| `storage.disk.auto_create` | `bool` | No (default: `false`) | If set to true, the configured directory will be created if it does not exist. | +| `storage.disk.partitions` | `array[string]` | No | Non-overlapping `data` prefixes used for partitioning the data on disk. | +| `storage.disk.badger` | `string` | No (default: empty) | "Superflags" passed to Badger allowing to modify advanced options. | + +See [the docs on disk storage](../misc-disk/) for details about the settings. diff --git a/docs/content/misc-disk.md b/docs/content/misc-disk.md new file mode 100644 index 0000000000..fb0208ef08 --- /dev/null +++ b/docs/content/misc-disk.md @@ -0,0 +1,177 @@ +--- +title: Disk Storage +kind: misc +weight: 10 +--- + +This page outlines configuration options relevant to using the disk storage +feature of OPA. +Configuration options are to be found in [the configuration docs](../configuration/#disk-storage). + +{{< info >}} +The persistent disk storage enables OPA to work with data that does not fit +into the memory resources granted to the OPA server. +It is **not** supposed to be used as the primary source of truth for that data. + +The on-disk storage should be considered ephemeral: you need to secure the +means to restore that data. +Backup and restore, or repair procedures for data corruption are not provided +at this time. +{{< /info >}} + +## Partitions + +Partitions determine how the JSON data is split up when stored in the +underlying key-value store. +For example, this table shows how an example document would be stored given +different configured partitions: + +```json +{ + "users": { + "alice": { "roles": ["admin"] }, + "bob": { "roles": ["viewer"] } + } +} +``` + +| Partitions | Keys | Values | +| --- | --- | --- | +| (1) none | `/users` | `{"alice": {"roles": ["admin"]}, "bob": {"roles": ["viewer"]}}}` | +| --- | --- | --- | +| (2) `/users` | `/users/alice` | `{"roles": ["admin"]}` | +| | `/users/bob` | `{"roles": ["viewer"]}` | +| --- | --- | --- | +| (3) `/users/*` | `/users/alice/roles` | `["admin"]` | +| | `/users/bob/roles` | `["viewer"]` | + +Partitioning has consequences on performance: in the example above, the +number of keys to retrieve from the database (and the amount of data of +its values) varies. + +| Query | Partitions | Number of keys read | +| --- | --- | --- | +| `data.users` | (1) | 1 | +| | (2) | 2 | +| | (3) | 2 | +| --- | --- | --- | +| `data.users.alice` | (1) | 1 with `bob` data thrown away| +| | (2) | 2 | +| | (3) | 2 | + + +For example, retrieving the full extent of `data.users` from the disk store +will require a single key fetch with the partitions of (1). +With (2), the storage engine will fetch two keys and their values. + +Retrieving a single user's data, e.g. `data.users.alice`, will require +reading a single key and all the users data with (1); but throw away most +of it: all the data not belonging to `alice`. + +There is no one-size-fits-all setting for partitions: good settings depend +on the actual usage, and that comes down to the policies that are used with +OPA. +Commonly, you would optimize the partition settings for those queries that +are performance critical. + +To figure out suboptimal partitioning, please have a look at the exposed +metrics. + +## Metrics + +Using the [REST API](../rest-api/), you can include the `?metrics` query string +to gain insights into the disk storage access related to a certain OPA query. + +``` +$ curl 'http://localhost:8181/v1/data/tenants/acme1/bindings/user1?metrics' | opa eval -I 'input.metrics' -fpretty +{ + "counter_disk_read_bytes": 339, + "counter_disk_read_keys": 3, + "counter_server_query_cache_hit": 1, + "timer_disk_read_ns": 40736, + "timer_rego_external_resolve_ns": 251, + "timer_rego_input_parse_ns": 656, + "timer_rego_query_eval_ns": 66616, + "timer_server_handler_ns": 117539 +} +``` + +The `timer_disk_*_ns` timers give an indication about how much time +was spent with the different disk operations. + +Available timers are +- `timer_disk_read_ns` +- `timer_disk_write_ns` +- `timer_disk_commit_ns` + +Also note the `counter_disk_*` counters in the metrics: + +- `counter_disk_read_keys`: number of keys retrieved +- `counter_disk_written_keys`: number of keys written +- `counter_disk_deleted_keys`: number of keys deleted +- `counter_disk_read_bytes`: bytes retrieved + +Suboptimal partition settings can be spotted when the amount of +keys and bytes retrieved for a query is unproportional to the +actual data returned: the query likely had to retrieve a giant +JSON object, and most of it was thrown away. + +## Debug Logging + +Pass `--log-level debug` to `opa run` to see all the underlying storage +engine's logs. + +When debug logging is _enabled_, the service will output some +statistics about the configured disk partitions and their key +sizes. + +``` +[DEBUG] partition /tenants/acme3/bindings (pattern /tenants/*/bindings): key count: 10000 (estimated size 598890 bytes) +[DEBUG] partition /tenants/acme4/bindings (pattern /tenants/*/bindings): key count: 10000 (estimated size 598890 bytes) +[DEBUG] partition /tenants/acme8/bindings (pattern /tenants/*/bindings): key count: 10000 (estimated size 598890 bytes) +[DEBUG] partition /tenants/acme9/bindings (pattern /tenants/*/bindings): key count: 10000 (estimated size 598890 bytes) +[DEBUG] partition /tenants/acme0/bindings (pattern /tenants/*/bindings): key count: 10000 (estimated size 598890 bytes) +[DEBUG] partition /tenants/acme2/bindings (pattern /tenants/*/bindings): key count: 10000 (estimated size 598890 bytes) +[DEBUG] partition /tenants/acme6/bindings (pattern /tenants/*/bindings): key count: 10000 (estimated size 598890 bytes) +``` + +Note that this process will iterate over all database keys. +It only happens on startup, when debug logging is enabled. + +## Fine-tuning Badger settings (superflags) + +While partitioning should be the first thing to look into to tune the memory usage and +performance of the on-disk storage engine, this configurable gives you the means to +change many internal aspects of how Badger uses memory and disk storage. + +{{< danger >}} +To be used with care! + +Any of the Badger settings used by OPA can be overridden using this feature. +There is no validation happening for configurables set using this flag. + +When the embedded Badger version changes, these configurables could change, +too. +{{< /danger >}} + +The configurables correspond to Badger options that can be set on [the library's Options +struct](https://pkg.go.dev/github.com/dgraph-io/badger/v3#Options). + +The following configurables can *not* be overridden: +- `dir` +- `valuedir` +- `detectconflicts` + +Aside from conflict detection, Badger in OPA uses the default options [you can find here](https://github.com/dgraph-io/badger/blob/v3.2103.2/options.go#L128-L187). + +Conflict detection is disabled because the locking scheme used within OPA does not allow +for having multiple concurrent writes. + +### Example + +```yaml +storage: + disk: + directory: /tmp/disk + badger: nummemtables=1; numgoroutines=2; maxlevels=3 +``` \ No newline at end of file diff --git a/docs/content/rest-api.md b/docs/content/rest-api.md index 34d03544a0..09d81a57c9 100644 --- a/docs/content/rest-api.md +++ b/docs/content/rest-api.md @@ -1002,6 +1002,10 @@ If the path does not refer to an existing document, the server will attempt to c The server will respect the `If-None-Match` header if it is set to `*`. In this case, the server will not overwrite an existing document located at the path. +#### Query Parameters + +- **metrics** - Return performance metrics in addition to result. See [Performance Metrics](#performance-metrics) for more detail. + #### Status Codes - **204** - no content (success) @@ -1094,6 +1098,10 @@ Delete a document. The server processes the DELETE method as if the client had sent a PATCH request containing a single remove operation. +#### Query Parameters + +- **metrics** - Return performance metrics in addition to result. See [Performance Metrics](#performance-metrics) for more detail. + #### Status Codes - **204** - no content (success) diff --git a/internal/config/config.go b/internal/config/config.go index 69d930f4a4..b2020181a0 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -83,7 +83,7 @@ func Load(configFile string, overrides []string, overrideFiles []string) ([]byte processedConf := subEnvVars(string(bytes)) if err := yaml.Unmarshal([]byte(processedConf), &baseConf); err != nil { - return []byte{}, fmt.Errorf("failed to parse %s: %s", configFile, err) + return nil, fmt.Errorf("failed to parse %s: %s", configFile, err) } } @@ -93,7 +93,7 @@ func Load(configFile string, overrides []string, overrideFiles []string) ([]byte for _, override := range overrides { processedOverride := subEnvVars(override) if err := strvals.ParseInto(processedOverride, overrideConf); err != nil { - return []byte{}, fmt.Errorf("failed parsing --set data: %s", err) + return nil, fmt.Errorf("failed parsing --set data: %s", err) } } @@ -105,7 +105,7 @@ func Load(configFile string, overrides []string, overrideFiles []string) ([]byte return value, err } if err := strvals.ParseIntoFile(override, overrideConf, reader); err != nil { - return []byte{}, fmt.Errorf("failed parsing --set-file data: %s", err) + return nil, fmt.Errorf("failed parsing --set-file data: %s", err) } } diff --git a/plugins/bundle/plugin.go b/plugins/bundle/plugin.go index 2080187120..fba263c5fb 100644 --- a/plugins/bundle/plugin.go +++ b/plugins/bundle/plugin.go @@ -188,7 +188,7 @@ func (p *Plugin) Reconfigure(ctx context.Context, config interface{}) { // Deactivate the bundles that were removed params := storage.WriteParams - params.Context = storage.NewContext() + params.Context = storage.NewContext() // TODO(sr): metrics? err := storage.Txn(ctx, p.manager.Store, params, func(txn storage.Transaction) error { opts := &bundle.DeactivateOpts{ Ctx: ctx, @@ -513,7 +513,7 @@ func (p *Plugin) activate(ctx context.Context, name string, b *bundle.Bundle) er p.log(name).Debug("Bundle activation in progress. Opening storage transaction.") params := storage.WriteParams - params.Context = storage.NewContext() + params.Context = storage.NewContext().WithMetrics(p.status[name].Metrics) err := storage.Txn(ctx, p.manager.Store, params, func(txn storage.Transaction) error { p.log(name).Debug("Opened storage transaction (%v).", txn.ID()) diff --git a/plugins/bundle/plugin_test.go b/plugins/bundle/plugin_test.go index 7820e1d04b..4cce413326 100644 --- a/plugins/bundle/plugin_test.go +++ b/plugins/bundle/plugin_test.go @@ -23,20 +23,20 @@ import ( "testing" "time" - "github.com/open-policy-agent/opa/internal/file/archive" - - "github.com/open-policy-agent/opa/util/test" - "github.com/open-policy-agent/opa/ast" "github.com/open-policy-agent/opa/bundle" "github.com/open-policy-agent/opa/config" "github.com/open-policy-agent/opa/download" + "github.com/open-policy-agent/opa/internal/file/archive" "github.com/open-policy-agent/opa/keys" + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/metrics" "github.com/open-policy-agent/opa/plugins" "github.com/open-policy-agent/opa/storage" + "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/storage/inmem" "github.com/open-policy-agent/opa/util" + "github.com/open-policy-agent/opa/util/test" ) func TestPluginOneShot(t *testing.T) { @@ -97,6 +97,111 @@ func TestPluginOneShot(t *testing.T) { } } +func TestPluginOneShotDiskStorageMetrics(t *testing.T) { + + test.WithTempFS(nil, func(dir string) { + ctx := context.Background() + met := metrics.New() + store, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ + Dir: dir, + Partitions: []storage.Path{ + storage.MustParsePath("/foo"), + }, + }) + if err != nil { + t.Fatal(err) + } + manager := getTestManagerWithOpts(nil, store) + defer manager.Stop(ctx) + plugin := New(&Config{}, manager) + bundleName := "test-bundle" + plugin.status[bundleName] = &Status{Name: bundleName, Metrics: met} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) + + ensurePluginState(t, plugin, plugins.StateNotReady) + + module := "package foo\n\ncorge=1" + + b := bundle.Bundle{ + Manifest: bundle.Manifest{Revision: "quickbrownfaux"}, + Data: util.MustUnmarshalJSON([]byte(`{"foo": {"bar": 1, "baz": "qux"}}`)).(map[string]interface{}), + Modules: []bundle.ModuleFile{ + { + Path: "/foo/bar", + Parsed: ast.MustParseModule(module), + Raw: []byte(module), + }, + }, + } + + b.Manifest.Init() + + met = metrics.New() + plugin.oneShot(ctx, bundleName, download.Update{Bundle: &b, Metrics: met}) + + ensurePluginState(t, plugin, plugins.StateOK) + + // NOTE(sr): These assertion reflect the current behaviour only! Not prescriptive. + name := "disk_deleted_keys" + if exp, act := 1, met.Counter(name).Value(); act.(uint64) != uint64(exp) { + t.Errorf("%s: expected %v, got %v", name, exp, act) + } + name = "disk_written_keys" + if exp, act := 7, met.Counter(name).Value(); act.(uint64) != uint64(exp) { + t.Errorf("%s: expected %v, got %v", name, exp, act) + } + name = "disk_read_keys" + if exp, act := 13, met.Counter(name).Value(); act.(uint64) != uint64(exp) { + t.Errorf("%s: expected %v, got %v", name, exp, act) + } + name = "disk_read_bytes" + if exp, act := 346, met.Counter(name).Value(); act.(uint64) != uint64(exp) { + t.Errorf("%s: expected %v, got %v", name, exp, act) + } + for _, timer := range []string{ + "disk_commit", + "disk_write", + "disk_read", + } { + if act := met.Timer(timer).Int64(); act <= 0 { + t.Errorf("%s: expected non-zero timer, got %v", timer, act) + } + } + if t.Failed() { + t.Logf("all metrics: %v", met.All()) + } + + // Ensure we can read it all back -- this is the only bundle plugin test using disk storage, + // so some duplicating with TestPluginOneShot is OK: + + txn := storage.NewTransactionOrDie(ctx, manager.Store) + defer manager.Store.Abort(ctx, txn) + + ids, err := manager.Store.ListPolicies(ctx, txn) + if err != nil { + t.Fatal(err) + } else if len(ids) != 1 { + t.Fatal("Expected 1 policy") + } + + bs, err := manager.Store.GetPolicy(ctx, txn, ids[0]) + exp := []byte("package foo\n\ncorge=1") + if err != nil { + t.Fatal(err) + } else if !bytes.Equal(bs, exp) { + t.Fatalf("Bad policy content. Exp:\n%v\n\nGot:\n\n%v", string(exp), string(bs)) + } + + data, err := manager.Store.Read(ctx, txn, storage.Path{}) + expData := util.MustUnmarshalJSON([]byte(`{"foo": {"bar": 1, "baz": "qux"}, "system": {"bundles": {"test-bundle": {"manifest": {"revision": "quickbrownfaux", "roots": [""]}}}}}`)) + if err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(data, expData) { + t.Fatalf("Bad data content. Exp:\n%v\n\nGot:\n\n%v", expData, data) + } + }) +} + func TestPluginOneShotDeltaBundle(t *testing.T) { ctx := context.Background() @@ -1654,8 +1759,12 @@ func getTestManager() *plugins.Manager { return getTestManagerWithOpts(nil) } -func getTestManagerWithOpts(config []byte) *plugins.Manager { +func getTestManagerWithOpts(config []byte, stores ...storage.Store) *plugins.Manager { store := inmem.New() + if len(stores) == 1 { + store = stores[0] + } + manager, err := plugins.New(config, "test-instance-id", store) if err != nil { panic(err) diff --git a/plugins/plugins.go b/plugins/plugins.go index a3120b6155..85c260b5fb 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -625,6 +625,11 @@ func (m *Manager) Stop(ctx context.Context) { for i := range toStop { toStop[i].Stop(ctx) } + if c, ok := m.Store.(interface{ Close(context.Context) error }); ok { + if err := c.Close(ctx); err != nil { + m.logger.Error("Error closing store: %v", err) + } + } } // Reconfigure updates the configuration on the manager. diff --git a/rego/example_test.go b/rego/example_test.go index 039c52986c..40d2d0ea2b 100644 --- a/rego/example_test.go +++ b/rego/example_test.go @@ -14,6 +14,7 @@ import ( "os" "strings" + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/ast" @@ -352,7 +353,7 @@ func ExampleRego_Eval_persistent_storage() { // user's data is stored on a different row. Assuming the policy only reads // data for a single user to process the policy query, OPA can avoid loading // _all_ user data into memory this way. - store, err := disk.New(ctx, disk.Options{ + store, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ Dir: rootDir, Partitions: []storage.Path{{"example", "user"}}, }) @@ -379,7 +380,7 @@ func ExampleRego_Eval_persistent_storage() { // Re-open the store in the same directory. store.Close(ctx) - store2, err := disk.New(ctx, disk.Options{ + store2, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ Dir: rootDir, Partitions: []storage.Path{{"example", "user"}}, }) diff --git a/runtime/runtime.go b/runtime/runtime.go index 2fe38e3345..9ac3c3baa3 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -45,6 +45,7 @@ import ( "github.com/open-policy-agent/opa/repl" "github.com/open-policy-agent/opa/server" "github.com/open-policy-agent/opa/storage" + "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/storage/inmem" "github.com/open-policy-agent/opa/tracing" "github.com/open-policy-agent/opa/util" @@ -203,6 +204,11 @@ type Params struct { // If it is nil, a new mux.Router will be created Router *mux.Router + // DiskStorage, if set, will make the runtime instantiate a disk-backed storage + // implementation (instead of the default, in-memory store). + // It can also be enabled via config, and this runtime field takes precedence. + DiskStorage *disk.Options + DistributedTracingOpts tracing.Options } @@ -300,14 +306,11 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) { return nil, err } - var consoleLogger logging.Logger - - if params.ConsoleLogger == nil { + consoleLogger := params.ConsoleLogger + if consoleLogger == nil { l := logging.New() l.SetFormatter(internal_logging.GetFormatter(params.Logging.Format)) consoleLogger = l - } else { - consoleLogger = params.ConsoleLogger } if params.Router == nil { @@ -316,9 +319,26 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) { metrics := prometheus.New(metrics.New(), errorLogger(logger)) + var store storage.Store + if params.DiskStorage == nil { + params.DiskStorage, err = disk.OptionsFromConfig(config, params.ID) + if err != nil { + return nil, fmt.Errorf("parse disk store configuration: %w", err) + } + } + + if params.DiskStorage != nil { + store, err = disk.New(ctx, logger, metrics, *params.DiskStorage) + if err != nil { + return nil, fmt.Errorf("initialize disk store: %w", err) + } + } else { + store = inmem.New() + } + manager, err := plugins.New(config, params.ID, - inmem.New(), + store, plugins.Info(info), plugins.InitBundles(loaded.Bundles), plugins.InitFiles(loaded.Files), diff --git a/server/server.go b/server/server.go index 44181667fb..e52bc40d9a 100644 --- a/server/server.go +++ b/server/server.go @@ -1222,7 +1222,8 @@ func (s *Server) v1CompilePost(w http.ResponseWriter, r *http.Request) { m.Timer(metrics.RegoQueryParse).Stop() - txn, err := s.store.NewTransaction(ctx) + c := storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, storage.TransactionParams{Context: c}) if err != nil { writer.ErrorAuto(w, err) return @@ -1331,7 +1332,8 @@ func (s *Server) v1DataGet(w http.ResponseWriter, r *http.Request) { m.Timer(metrics.RegoInputParse).Stop() // Prepare for query. - txn, err := s.store.NewTransaction(ctx) + c := storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, storage.TransactionParams{Context: c}) if err != nil { writer.ErrorAuto(w, err) return @@ -1455,15 +1457,21 @@ func (s *Server) v1DataGet(w http.ResponseWriter, r *http.Request) { } func (s *Server) v1DataPatch(w http.ResponseWriter, r *http.Request) { + m := metrics.New() + m.Timer(metrics.ServerHandler).Start() + defer m.Timer(metrics.ServerHandler).Stop() + ctx := r.Context() vars := mux.Vars(r) - + includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true) ops := []types.PatchV1{} + m.Timer(metrics.RegoInputParse).Start() if err := util.NewJSONDecoder(r.Body).Decode(&ops); err != nil { writer.ErrorString(w, http.StatusBadRequest, types.CodeInvalidParameter, err) return } + m.Timer(metrics.RegoInputParse).Stop() patches, err := s.prepareV1PatchSlice(vars["path"], ops) if err != nil { @@ -1471,7 +1479,9 @@ func (s *Server) v1DataPatch(w http.ResponseWriter, r *http.Request) { return } - txn, err := s.store.NewTransaction(ctx, storage.WriteParams) + params := storage.WriteParams + params.Context = storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return @@ -1500,6 +1510,14 @@ func (s *Server) v1DataPatch(w http.ResponseWriter, r *http.Request) { return } + if includeMetrics { + result := types.DataResponseV1{ + Metrics: m.All(), + } + writer.JSON(w, http.StatusOK, result, false) + return + } + writer.Bytes(w, http.StatusNoContent, nil) } @@ -1541,7 +1559,9 @@ func (s *Server) v1DataPost(w http.ResponseWriter, r *http.Request) { m.Timer(metrics.RegoInputParse).Stop() - txn, err := s.store.NewTransaction(ctx) + params := storage.WriteParams + params.Context = storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return @@ -1674,14 +1694,21 @@ func (s *Server) v1DataPost(w http.ResponseWriter, r *http.Request) { } func (s *Server) v1DataPut(w http.ResponseWriter, r *http.Request) { + m := metrics.New() + m.Timer(metrics.ServerHandler).Start() + defer m.Timer(metrics.ServerHandler).Stop() + ctx := r.Context() vars := mux.Vars(r) + includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true) + m.Timer(metrics.RegoInputParse).Start() var value interface{} if err := util.NewJSONDecoder(r.Body).Decode(&value); err != nil { writer.ErrorString(w, http.StatusBadRequest, types.CodeInvalidParameter, err) return } + m.Timer(metrics.RegoInputParse).Stop() path, ok := storage.ParsePathEscaped("/" + strings.Trim(vars["path"], "/")) if !ok { @@ -1689,7 +1716,9 @@ func (s *Server) v1DataPut(w http.ResponseWriter, r *http.Request) { return } - txn, err := s.store.NewTransaction(ctx, storage.WriteParams) + params := storage.WriteParams + params.Context = storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return @@ -1701,15 +1730,16 @@ func (s *Server) v1DataPut(w http.ResponseWriter, r *http.Request) { } _, err = s.store.Read(ctx, txn, path) - if err != nil { if !storage.IsNotFound(err) { s.abortAuto(ctx, txn, w, err) return } - if err := storage.MakeDir(ctx, s.store, txn, path[:len(path)-1]); err != nil { - s.abortAuto(ctx, txn, w, err) - return + if len(path) > 0 { + if err := storage.MakeDir(ctx, s.store, txn, path[:len(path)-1]); err != nil { + s.abortAuto(ctx, txn, w, err) + return + } } } else if r.Header.Get("If-None-Match") == "*" { s.store.Abort(ctx, txn) @@ -1733,12 +1763,25 @@ func (s *Server) v1DataPut(w http.ResponseWriter, r *http.Request) { return } + if includeMetrics { + result := types.DataResponseV1{ + Metrics: m.All(), + } + writer.JSON(w, http.StatusOK, result, false) + return + } + writer.Bytes(w, http.StatusNoContent, nil) } func (s *Server) v1DataDelete(w http.ResponseWriter, r *http.Request) { + m := metrics.New() + m.Timer(metrics.ServerHandler).Start() + defer m.Timer(metrics.ServerHandler).Stop() + ctx := r.Context() vars := mux.Vars(r) + includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true) path, ok := storage.ParsePathEscaped("/" + strings.Trim(vars["path"], "/")) if !ok { @@ -1746,7 +1789,9 @@ func (s *Server) v1DataDelete(w http.ResponseWriter, r *http.Request) { return } - txn, err := s.store.NewTransaction(ctx, storage.WriteParams) + params := storage.WriteParams + params.Context = storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return @@ -1773,6 +1818,14 @@ func (s *Server) v1DataDelete(w http.ResponseWriter, r *http.Request) { return } + if includeMetrics { + result := types.DataResponseV1{ + Metrics: m.All(), + } + writer.JSON(w, http.StatusOK, result, false) + return + } + writer.Bytes(w, http.StatusNoContent, nil) } @@ -1780,7 +1833,7 @@ func (s *Server) v1PoliciesDelete(w http.ResponseWriter, r *http.Request) { ctx := r.Context() vars := mux.Vars(r) pretty := getBoolParam(r.URL, types.ParamPrettyV1, true) - includeMetrics := getBoolParam(r.URL, types.ParamPrettyV1, true) + includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true) id, err := url.PathUnescape(vars["path"]) if err != nil { @@ -1789,8 +1842,9 @@ func (s *Server) v1PoliciesDelete(w http.ResponseWriter, r *http.Request) { } m := metrics.New() - - txn, err := s.store.NewTransaction(ctx, storage.WriteParams) + params := storage.WriteParams + params.Context = storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return @@ -1948,7 +2002,9 @@ func (s *Server) v1PoliciesPut(w http.ResponseWriter, r *http.Request) { m.Timer("server_read_bytes").Stop() - txn, err := s.store.NewTransaction(ctx, storage.WriteParams) + params := storage.WriteParams + params.Context = storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return @@ -2071,7 +2127,8 @@ func (s *Server) v1QueryGet(w http.ResponseWriter, r *http.Request) { includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true) includeInstrumentation := getBoolParam(r.URL, types.ParamInstrumentV1, true) - txn, err := s.store.NewTransaction(ctx) + params := storage.TransactionParams{Context: storage.NewContext().WithMetrics(m)} + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return @@ -2140,7 +2197,8 @@ func (s *Server) v1QueryPost(w http.ResponseWriter, r *http.Request) { } } - txn, err := s.store.NewTransaction(ctx) + params := storage.TransactionParams{Context: storage.NewContext().WithMetrics(m)} + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return diff --git a/server/server_test.go b/server/server_test.go index 3596c141f5..d5cd2551b7 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -27,6 +27,7 @@ import ( "github.com/open-policy-agent/opa/bundle" "github.com/open-policy-agent/opa/config" "github.com/open-policy-agent/opa/internal/distributedtracing" + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/metrics" "github.com/open-policy-agent/opa/plugins" pluginBundle "github.com/open-policy-agent/opa/plugins/bundle" @@ -36,8 +37,10 @@ import ( "github.com/open-policy-agent/opa/server/types" "github.com/open-policy-agent/opa/server/writer" "github.com/open-policy-agent/opa/storage" + "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/storage/inmem" "github.com/open-policy-agent/opa/util" + "github.com/open-policy-agent/opa/util/test" "github.com/open-policy-agent/opa/version" ) @@ -991,35 +994,48 @@ func TestCompileV1(t *testing.T) { } func TestCompileV1Observability(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + test.WithTempFS(nil, func(root string) { + disk, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{Dir: root}) + if err != nil { + t.Fatal(err) + } + f := newFixtureWithStore(t, disk) - f := newFixture(t) - - err := f.v1(http.MethodPut, "/policies/test", `package test + err = f.v1(http.MethodPut, "/policies/test", `package test p { input.x = 1 }`, 200, "") - if err != nil { - t.Fatal(err) - } + if err != nil { + t.Fatal(err) + } - compileReq := newReqV1(http.MethodPost, "/compile?metrics&explain=full", `{ + compileReq := newReqV1(http.MethodPost, "/compile?metrics&explain=full", `{ "query": "data.test.p = true" }`) - f.reset() - f.server.Handler.ServeHTTP(f.recorder, compileReq) + f.reset() + f.server.Handler.ServeHTTP(f.recorder, compileReq) - var response types.CompileResponseV1 - if err := json.NewDecoder(f.recorder.Body).Decode(&response); err != nil { - t.Fatal(err) - } + var response types.CompileResponseV1 + if err := json.NewDecoder(f.recorder.Body).Decode(&response); err != nil { + t.Fatal(err) + } - if len(response.Explanation) == 0 { - t.Fatal("Expected non-empty explanation") - } + if len(response.Explanation) == 0 { + t.Fatal("Expected non-empty explanation") + } - if _, ok := response.Metrics["timer_rego_partial_eval_ns"]; !ok { - t.Fatal("Expected partial evaluation latency") - } + assertMetricsExist(t, response.Metrics, []string{ + "timer_rego_partial_eval_ns", + "timer_rego_query_compile_ns", + "timer_rego_query_parse_ns", + "timer_server_handler_ns", + "counter_disk_read_keys", + "counter_disk_read_bytes", + "timer_disk_read_ns", + }) + }) } func TestCompileV1UnsafeBuiltin(t *testing.T) { @@ -1175,13 +1191,23 @@ p = true { false }` }}, {"patch root", []tr{ {http.MethodPatch, "/data", `[ - {"op": "add", - "path": "/", - "value": {"a": 1, "b": 2} + { + "op": "add", + "path": "/", + "value": {"a": 1, "b": 2} } ]`, 204, ""}, {http.MethodGet, "/data", "", 200, `{"result": {"a": 1, "b": 2}}`}, }}, + {"patch root invalid", []tr{ + {http.MethodPatch, "/data", `[ + { + "op": "add", + "path": "/", + "value": [1,2,3] + } + ]`, 400, ""}, + }}, {"patch invalid", []tr{ {http.MethodPatch, "/data", `[ { @@ -1547,11 +1573,63 @@ p = true { false }` for _, tc := range tests { t.Run(tc.note, func(t *testing.T) { - executeRequests(t, tc.reqs) + test.WithTempFS(nil, func(root string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + disk, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{Dir: root}) + if err != nil { + t.Fatal(err) + } + executeRequests(t, tc.reqs, + variant{"inmem", nil}, + variant{"disk", []func(*Server){ + func(s *Server) { + s.WithStore(disk) + }, + }}, + ) + }) }) } } +func TestDataV1Metrics(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + test.WithTempFS(nil, func(root string) { + disk, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{Dir: root}) + if err != nil { + t.Fatal(err) + } + + f := newFixtureWithStore(t, disk) + put := newReqV1(http.MethodPut, `/data?metrics`, `{"foo":"bar"}`) + f.server.Handler.ServeHTTP(f.recorder, put) + + if f.recorder.Code != 200 { + t.Fatalf("Expected success but got %v", f.recorder) + } + + var result types.DataResponseV1 + err = util.UnmarshalJSON(f.recorder.Body.Bytes(), &result) + if err != nil { + t.Fatalf("Unexpected error while unmarshalling result: %v", err) + } + + assertMetricsExist(t, result.Metrics, []string{ + "counter_disk_read_keys", + "counter_disk_deleted_keys", + "counter_disk_written_keys", + "counter_disk_read_bytes", + "timer_rego_input_parse_ns", + "timer_server_handler_ns", + "timer_disk_read_ns", + "timer_disk_write_ns", + "timer_disk_commit_ns", + }) + }) +} + func TestConfigV1(t *testing.T) { f := newFixture(t) @@ -1681,105 +1759,119 @@ func TestDataPutV1IfNoneMatch(t *testing.T) { func TestBundleScope(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + test.WithTempFS(nil, func(root string) { + disk, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{Dir: root}) + if err != nil { + t.Fatal(err) + } - f := newFixture(t) + for _, v := range []variant{ + {"inmem", nil}, + {"disk", []func(*Server){func(s *Server) { s.WithStore(disk) }}}, + } { + t.Run(v.name, func(t *testing.T) { + f := newFixture(t, v.opts...) - txn := storage.NewTransactionOrDie(ctx, f.server.store, storage.WriteParams) + txn := storage.NewTransactionOrDie(ctx, f.server.store, storage.WriteParams) - if err := bundle.WriteManifestToStore(ctx, f.server.store, txn, "test-bundle", bundle.Manifest{ - Revision: "AAAAA", - Roots: &[]string{"a/b/c", "x/y", "foobar"}, - }); err != nil { - t.Fatal(err) - } + if err := bundle.WriteManifestToStore(ctx, f.server.store, txn, "test-bundle", bundle.Manifest{ + Revision: "AAAAA", + Roots: &[]string{"a/b/c", "x/y", "foobar"}, + }); err != nil { + t.Fatal(err) + } - if err := f.server.store.UpsertPolicy(ctx, txn, "someid", []byte(`package x.y.z`)); err != nil { - t.Fatal(err) - } + if err := f.server.store.UpsertPolicy(ctx, txn, "someid", []byte(`package x.y.z`)); err != nil { + t.Fatal(err) + } - if err := f.server.store.Commit(ctx, txn); err != nil { - t.Fatal(err) - } + if err := f.server.store.Commit(ctx, txn); err != nil { + t.Fatal(err) + } - cases := []tr{ - { - method: "PUT", - path: "/data/a/b", - body: "1", - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "path a/b is owned by bundle \"test-bundle\""}`, - }, - { - method: "PUT", - path: "/data/a/b/c", - body: "1", - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "path a/b/c is owned by bundle \"test-bundle\""}`, - }, - { - method: "PUT", - path: "/data/a/b/c/d", - body: "1", - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "path a/b/c/d is owned by bundle \"test-bundle\""}`, - }, - { - method: "PUT", - path: "/data/a/b/d", - body: "1", - code: http.StatusNoContent, - }, - { - method: "PATCH", - path: "/data/a", - body: `[{"path": "/b/c", "op": "add", "value": 1}]`, - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "path a/b/c is owned by bundle \"test-bundle\""}`, - }, - { - method: "DELETE", - path: "/data/a", - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "path a is owned by bundle \"test-bundle\""}`, - }, - { - method: "PUT", - path: "/policies/test1", - body: `package a.b`, - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "path a/b is owned by bundle \"test-bundle\""}`, - }, - { - method: "DELETE", - path: "/policies/someid", - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "path x/y/z is owned by bundle \"test-bundle\""}`, - }, - { - method: "PUT", - path: "/data/foo/bar", - body: "1", - code: http.StatusNoContent, - }, - { - method: "PUT", - path: "/data/foo", - body: "1", - code: http.StatusNoContent, - }, - { - method: "PUT", - path: "/data", - body: `{"a": "b"}`, - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "can't write to document root with bundle roots configured"}`, - }, - } + cases := []tr{ + { + method: "PUT", + path: "/data/a/b", + body: "1", + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "path a/b is owned by bundle \"test-bundle\""}`, + }, + { + method: "PUT", + path: "/data/a/b/c", + body: "1", + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "path a/b/c is owned by bundle \"test-bundle\""}`, + }, + { + method: "PUT", + path: "/data/a/b/c/d", + body: "1", + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "path a/b/c/d is owned by bundle \"test-bundle\""}`, + }, + { + method: "PUT", + path: "/data/a/b/d", + body: "1", + code: http.StatusNoContent, + }, + { + method: "PATCH", + path: "/data/a", + body: `[{"path": "/b/c", "op": "add", "value": 1}]`, + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "path a/b/c is owned by bundle \"test-bundle\""}`, + }, + { + method: "DELETE", + path: "/data/a", + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "path a is owned by bundle \"test-bundle\""}`, + }, + { + method: "PUT", + path: "/policies/test1", + body: `package a.b`, + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "path a/b is owned by bundle \"test-bundle\""}`, + }, + { + method: "DELETE", + path: "/policies/someid", + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "path x/y/z is owned by bundle \"test-bundle\""}`, + }, + { + method: "PUT", + path: "/data/foo/bar", + body: "1", + code: http.StatusNoContent, + }, + { + method: "PUT", + path: "/data/foo", + body: "1", + code: http.StatusNoContent, + }, + { + method: "PUT", + path: "/data", + body: `{"a": "b"}`, + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "can't write to document root with bundle roots configured"}`, + }, + } - if err := f.v1TestRequests(cases); err != nil { - t.Fatal(err) - } + if err := f.v1TestRequests(cases); err != nil { + t.Fatal(err) + } + }) + } + }) } func TestBundleScopeMultiBundle(t *testing.T) { @@ -2304,51 +2396,76 @@ func TestDataProvenanceMultiBundle(t *testing.T) { } func TestDataMetricsEval(t *testing.T) { - f := newFixture(t) + // These tests all use the POST /v1/data API with ?metrics appended. + // We're setting up the disk store because that injects a few extra metrics, + // which storage/inmem does not. - // Make a request to evaluate `data` - testDataMetrics(t, f, "/data?metrics", []string{ - "counter_server_query_cache_hit", - "timer_rego_input_parse_ns", - "timer_rego_query_parse_ns", - "timer_rego_query_compile_ns", - "timer_rego_query_eval_ns", - "timer_server_handler_ns", - "timer_rego_external_resolve_ns", - }) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + test.WithTempFS(nil, func(root string) { + disk, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{Dir: root}) + if err != nil { + t.Fatal(err) + } - // Repeat previous request, expect to have hit the query cache - // so fewer timers should have been reported. - testDataMetrics(t, f, "/data?metrics", []string{ - "counter_server_query_cache_hit", - "timer_rego_input_parse_ns", - "timer_rego_query_eval_ns", - "timer_server_handler_ns", - "timer_rego_external_resolve_ns", - }) + f := newFixtureWithStore(t, disk) + + // Make a request to evaluate `data` + testDataMetrics(t, f, "/data?metrics", []string{ + "counter_server_query_cache_hit", + "counter_disk_read_keys", + "counter_disk_read_bytes", + "timer_rego_input_parse_ns", + "timer_rego_query_parse_ns", + "timer_rego_query_compile_ns", + "timer_rego_query_eval_ns", + "timer_server_handler_ns", + "timer_disk_read_ns", + "timer_rego_external_resolve_ns", + }) - // Make a request to evaluate `data` and use partial evaluation, - // this should not hit the same query cache result as the previous - // request. - testDataMetrics(t, f, "/data?metrics&partial", []string{ - "counter_server_query_cache_hit", - "timer_rego_input_parse_ns", - "timer_rego_module_compile_ns", - "timer_rego_query_parse_ns", - "timer_rego_query_compile_ns", - "timer_rego_query_eval_ns", - "timer_rego_partial_eval_ns", - "timer_server_handler_ns", - "timer_rego_external_resolve_ns", - }) + // Repeat previous request, expect to have hit the query cache + // so fewer timers should have been reported. + testDataMetrics(t, f, "/data?metrics", []string{ + "counter_server_query_cache_hit", + "counter_disk_read_keys", + "counter_disk_read_bytes", + "timer_rego_input_parse_ns", + "timer_rego_query_eval_ns", + "timer_server_handler_ns", + "timer_disk_read_ns", + "timer_rego_external_resolve_ns", + }) + + // Make a request to evaluate `data` and use partial evaluation, + // this should not hit the same query cache result as the previous + // request. + testDataMetrics(t, f, "/data?metrics&partial", []string{ + "counter_server_query_cache_hit", + "counter_disk_read_keys", + "counter_disk_read_bytes", + "timer_rego_input_parse_ns", + "timer_rego_module_compile_ns", + "timer_rego_query_parse_ns", + "timer_rego_query_compile_ns", + "timer_rego_query_eval_ns", + "timer_rego_partial_eval_ns", + "timer_server_handler_ns", + "timer_disk_read_ns", + "timer_rego_external_resolve_ns", + }) - // Repeat previous partial eval request, this time it should - // be cached - testDataMetrics(t, f, "/data?metrics&partial", []string{ - "counter_server_query_cache_hit", - "timer_rego_input_parse_ns", - "timer_rego_query_eval_ns", - "timer_server_handler_ns", + // Repeat previous partial eval request, this time it should + // be cached + testDataMetrics(t, f, "/data?metrics&partial", []string{ + "counter_server_query_cache_hit", + "counter_disk_read_keys", + "counter_disk_read_bytes", + "timer_rego_input_parse_ns", + "timer_rego_query_eval_ns", + "timer_server_handler_ns", + "timer_disk_read_ns", + }) }) } @@ -2363,9 +2480,14 @@ func testDataMetrics(t *testing.T, f *fixture, url string, expected []string) { if err := util.NewJSONDecoder(f.recorder.Body).Decode(&result); err != nil { t.Fatalf("Unexpected JSON decode error: %v", err) } + assertMetricsExist(t, result.Metrics, expected) +} + +func assertMetricsExist(t *testing.T, metrics types.MetricsV1, expected []string) { + t.Helper() for _, key := range expected { - v, ok := result.Metrics[key] + v, ok := metrics[key] if !ok { t.Errorf("Missing expected metric: %s", key) } else if v == nil { @@ -2374,8 +2496,8 @@ func testDataMetrics(t *testing.T, f *fixture, url string, expected []string) { } - if len(expected) != len(result.Metrics) { - t.Errorf("Expected %d metrics, got %d\n\n\tValues: %+v", len(expected), len(result.Metrics), result.Metrics) + if len(expected) != len(metrics) { + t.Errorf("Expected %d metrics, got %d\n\n\tValues: %+v", len(expected), len(metrics), metrics) } } @@ -3139,31 +3261,50 @@ func TestDecisionLogErrorMessage(t *testing.T) { } func TestQueryV1(t *testing.T) { - f := newFixture(t) - get := newReqV1(http.MethodGet, `/query?q=a=[1,2,3]%3Ba[i]=x`, "") - f.server.Handler.ServeHTTP(f.recorder, get) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + test.WithTempFS(nil, func(root string) { + disk, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{Dir: root}) + if err != nil { + t.Fatal(err) + } - if f.recorder.Code != 200 { - t.Fatalf("Expected success but got %v", f.recorder) - } + f := newFixtureWithStore(t, disk) + get := newReqV1(http.MethodGet, `/query?q=a=[1,2,3]%3Ba[i]=x&metrics`, "") + f.server.Handler.ServeHTTP(f.recorder, get) - var expected types.QueryResponseV1 - err := util.UnmarshalJSON([]byte(`{ + if f.recorder.Code != 200 { + t.Fatalf("Expected success but got %v", f.recorder) + } + + var expected types.QueryResponseV1 + err = util.UnmarshalJSON([]byte(`{ "result": [{"a":[1,2,3],"i":0,"x":1},{"a":[1,2,3],"i":1,"x":2},{"a":[1,2,3],"i":2,"x":3}] }`), &expected) - if err != nil { - panic(err) - } + if err != nil { + panic(err) + } - var result types.QueryResponseV1 - err = util.UnmarshalJSON(f.recorder.Body.Bytes(), &result) - if err != nil { - t.Fatalf("Unexpected error while unmarshalling result: %v", err) - } + var result types.QueryResponseV1 + err = util.UnmarshalJSON(f.recorder.Body.Bytes(), &result) + if err != nil { + t.Fatalf("Unexpected error while unmarshalling result: %v", err) + } - if !reflect.DeepEqual(result, expected) { - t.Fatalf("Expected %v but got: %v", expected, result) - } + assertMetricsExist(t, result.Metrics, []string{ + "counter_disk_read_keys", + "counter_disk_read_bytes", + "timer_rego_query_compile_ns", + "timer_rego_query_eval_ns", + // "timer_server_handler_ns", // TODO(sr): we're not consistent about timing this? + "timer_disk_read_ns", + }) + + result.Metrics = nil + if !reflect.DeepEqual(result, expected) { + t.Fatalf("Expected %v but got: %v", expected, result) + } + }) } func TestBadQueryV1(t *testing.T) { @@ -3620,26 +3761,24 @@ type fixture struct { func newFixture(t *testing.T, opts ...func(*Server)) *fixture { ctx := context.Background() - store := inmem.New() - m, err := plugins.New([]byte{}, "test", store) - if err != nil { - panic(err) - } - - if err := m.Start(ctx); err != nil { - panic(err) - } - server := New(). WithAddresses([]string{"localhost:8182"}). - WithStore(store). - WithManager(m) + WithStore(inmem.New()) // potentially overridden via opts for _, opt := range opts { opt(server) } + + m, err := plugins.New([]byte{}, "test", server.store) + if err != nil { + t.Fatal(err) + } + server = server.WithManager(m) + if err := m.Start(ctx); err != nil { + t.Fatal(err) + } server, err = server.Init(ctx) if err != nil { - panic(err) + t.Fatal(err) } recorder := httptest.NewRecorder() @@ -3684,7 +3823,7 @@ func newFixtureWithStore(t *testing.T, store storage.Store, opts ...func(*Server func (f *fixture) v1TestRequests(trs []tr) error { for i, tr := range trs { if err := f.v1(tr.method, tr.path, tr.body, tr.code, tr.resp); err != nil { - return errors.Wrapf(err, "error on test request #%d", i+1) + return fmt.Errorf("error on test request #%d: %w", i+1, err) } } return nil @@ -3750,13 +3889,22 @@ func (f *fixture) reset() { f.recorder = httptest.NewRecorder() } -func executeRequests(t *testing.T, reqs []tr) { +type variant struct { + name string + opts []func(*Server) +} + +func executeRequests(t *testing.T, reqs []tr, variants ...variant) { t.Helper() - f := newFixture(t) - for i, req := range reqs { - if err := f.v1(req.method, req.path, req.body, req.code, req.resp); err != nil { - t.Errorf("Unexpected response on request %d: %v", i+1, err) - } + for _, v := range variants { + t.Run(v.name, func(t *testing.T) { + f := newFixture(t, v.opts...) + for i, req := range reqs { + if err := f.v1(req.method, req.path, req.body, req.code, req.resp); err != nil { + t.Errorf("Unexpected response on request %d: %v", i+1, err) + } + } + }) } } @@ -4145,17 +4293,17 @@ func TestDistributedTracingDisabled(t *testing.T) { type mockHTTPHandler struct{} -func (m *mockHTTPHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { +func (*mockHTTPHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) } type mockMetricsProvider struct{} -func (m *mockMetricsProvider) RegisterEndpoints(registrar func(path, method string, handler http.Handler)) { +func (*mockMetricsProvider) RegisterEndpoints(registrar func(string, string, http.Handler)) { registrar("/metrics", "GET", new(mockHTTPHandler)) } -func (m *mockMetricsProvider) InstrumentHandler(handler http.Handler, label string) http.Handler { +func (*mockMetricsProvider) InstrumentHandler(handler http.Handler, _ string) http.Handler { return handler } @@ -4167,21 +4315,19 @@ type mockHTTPListener struct { t httpListenerType } -var _ httpListener = (*mockHTTPListener)(nil) - func (m mockHTTPListener) Addr() string { return m.addrs } -func (m mockHTTPListener) ListenAndServe() error { +func (mockHTTPListener) ListenAndServe() error { return errors.New("not implemented") } -func (m mockHTTPListener) ListenAndServeTLS(certFile, keyFile string) error { +func (mockHTTPListener) ListenAndServeTLS(string, string) error { return errors.New("not implemented") } -func (m mockHTTPListener) Shutdown(ctx context.Context) error { +func (m mockHTTPListener) Shutdown(context.Context) error { var err error if m.shutdownHook != nil { err = m.shutdownHook() diff --git a/storage/disk/config.go b/storage/disk/config.go new file mode 100644 index 0000000000..e1019dbde6 --- /dev/null +++ b/storage/disk/config.go @@ -0,0 +1,75 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package disk + +import ( + "errors" + "fmt" + "os" + + badger "github.com/dgraph-io/badger/v3" + "github.com/open-policy-agent/opa/config" + "github.com/open-policy-agent/opa/storage" + "github.com/open-policy-agent/opa/util" +) + +type cfg struct { + Dir string `json:"directory"` + AutoCreate bool `json:"auto_create"` + Partitions []string `json:"partitions"` + Badger string `json:"badger"` +} + +var ErrInvalidPartitionPath = errors.New("invalid storage path") + +// OptionsFromConfig parses the passed config, extracts the disk storage +// settings, validates it, and returns a *Options struct pointer on success. +func OptionsFromConfig(raw []byte, id string) (*Options, error) { + parsedConfig, err := config.ParseConfig(raw, id) + if err != nil { + return nil, err + } + + if parsedConfig.Storage == nil || len(parsedConfig.Storage.Disk) == 0 { + return nil, nil + } + + var c cfg + if err := util.Unmarshal(parsedConfig.Storage.Disk, &c); err != nil { + return nil, err + } + + if _, err := os.Stat(c.Dir); err != nil { + if os.IsNotExist(err) && c.AutoCreate { + err = os.MkdirAll(c.Dir, 0700) // overwrite err + } + if err != nil { + return nil, fmt.Errorf("directory %v invalid: %w", c.Dir, err) + } + } + + opts := Options{ + Dir: c.Dir, + Badger: c.Badger, + } + for _, path := range c.Partitions { + p, ok := storage.ParsePath(path) + if !ok { + return nil, fmt.Errorf("partition path '%v': %w", path, ErrInvalidPartitionPath) + } + opts.Partitions = append(opts.Partitions, p) + } + + return &opts, nil +} + +func badgerConfigFromOptions(opts Options) badger.Options { + // Set some things _after_ FromSuperFlag to prohibit overriding them + return badger.DefaultOptions(""). + FromSuperFlag(opts.Badger). + WithDir(dataDir(opts.Dir)). + WithValueDir(dataDir(opts.Dir)). + WithDetectConflicts(false) // We only allow one write txn at a time; so conflicts cannot happen. +} diff --git a/storage/disk/config_test.go b/storage/disk/config_test.go new file mode 100644 index 0000000000..7811c123bb --- /dev/null +++ b/storage/disk/config_test.go @@ -0,0 +1,238 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package disk + +import ( + "context" + "errors" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/dgraph-io/badger/v3" + "github.com/open-policy-agent/opa/logging" +) + +func TestNewFromConfig(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "disk_test") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { os.RemoveAll(tmpdir) }) + + for _, tc := range []struct { + note string + config string + err error // gets unwrapped + nothing bool // returns no disk options? + }{ + { + note: "no storage section", + config: "", + nothing: true, + }, + { + note: "successful init, no partitions", + config: ` +storage: + disk: + directory: "` + tmpdir + `" +`, + }, + { + note: "successful init, valid partitions", + config: ` +storage: + disk: + directory: "` + tmpdir + `" + partitions: + - /foo/bar + - /baz +`, + }, + { + note: "partitions invalid", + config: ` +storage: + disk: + directory: "` + tmpdir + `" + partitions: + - /foo/bar + - baz +`, + err: ErrInvalidPartitionPath, + }, + { + note: "directory does not exist", + config: ` +storage: + disk: + directory: "` + tmpdir + `/foobar" +`, + err: os.ErrNotExist, + }, + { + note: "auto-create directory, does not exist", + config: ` +storage: + disk: + auto_create: true + directory: "` + tmpdir + `/foobar" +`, + }, + { + note: "auto-create directory, does already exist", // could be the second run + config: ` +storage: + disk: + auto_create: true + directory: "` + tmpdir + `" +`, + }, + } { + t.Run(tc.note, func(t *testing.T) { + d, err := OptionsFromConfig([]byte(tc.config), "id") + if !errors.Is(err, tc.err) { + t.Errorf("err: expected %v, got %v", tc.err, err) + } + if tc.nothing && d != nil { + t.Errorf("expected no disk options, got %v", d) + } + }) + } +} + +func TestDataDirPrefix(t *testing.T) { + ctx := context.Background() + tmpdir, err := ioutil.TempDir("", "disk_test") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { os.RemoveAll(tmpdir) }) + + d, err := New(ctx, logging.NewNoOpLogger(), nil, Options{ + Dir: tmpdir, + }) + if err != nil { + t.Fatal(err) + } + d.Close(ctx) + + dir := filepath.Join(tmpdir, "data") + if _, err := os.Stat(dir); err != nil { + t.Fatalf("stat %v: %v", dir, err) + } + files, err := os.ReadDir(tmpdir) + if err != nil { + t.Fatal(err) + } + + // We currently only expect a single directory here: "data" + for _, file := range files { + if file.Name() != "data" { + t.Errorf("unexpected file in dir: %v", file.Name()) + } + } +} + +func TestBadgerConfigFromOptions(t *testing.T) { + type check func(*testing.T, badger.Options) + checks := func(c ...check) []check { + return c + } + valueDir := func(exp string) check { + return func(t *testing.T, o badger.Options) { + if act := o.ValueDir; act != exp { + t.Errorf("ValueDir: expected %v, got %v", exp, act) + } + } + } + dir := func(exp string) check { + return func(t *testing.T, o badger.Options) { + if act := o.Dir; act != exp { + t.Errorf("Dir: expected %v, got %v", exp, act) + } + } + } + conflict := func(exp bool) check { + return func(t *testing.T, o badger.Options) { + if act := o.DetectConflicts; act != exp { + t.Errorf("DetectConflicts: expected %v, got %v", exp, act) + } + } + } + nummemtables := func(exp int) check { + return func(t *testing.T, o badger.Options) { + if act := o.NumMemtables; act != exp { + t.Errorf("Dir: expected %v, got %v", exp, act) + } + } + } + numversionstokeep := func(exp int) check { + return func(t *testing.T, o badger.Options) { + if act := o.NumVersionsToKeep; act != exp { + t.Errorf("Dir: expected %v, got %v", exp, act) + } + } + } + + tests := []struct { + note string + opts Options + checks []check + }{ + { + "defaults", + Options{ + Dir: "foo", + }, + checks( + valueDir("foo/data"), + dir("foo/data"), + conflict(false), + ), + }, + { + "valuedir+dir override", + Options{ + Dir: "foo", + Badger: `valuedir="baz"; dir="quz"`, + }, + checks( + valueDir("foo/data"), + dir("foo/data"), + ), + }, + { + "conflict detection override", + Options{ + Dir: "foo", + Badger: `detectconflicts=true`, + }, + checks(conflict(false)), + }, + { + "two valid overrides", // NOTE(sr): This is just one example + Options{ + Dir: "foo", + Badger: `nummemtables=123; numversionstokeep=123`, + }, + checks( + nummemtables(123), + numversionstokeep(123), + ), + }, + } + + for _, tc := range tests { + t.Run(tc.note, func(t *testing.T) { + act := badgerConfigFromOptions(tc.opts) + for _, check := range tc.checks { + check(t, act) + } + }) + } +} diff --git a/storage/disk/disk.go b/storage/disk/disk.go index 348499173a..9e48c36bc2 100644 --- a/storage/disk/disk.go +++ b/storage/disk/disk.go @@ -26,6 +26,10 @@ // to a single key). Similarly, values that fall outside of partitions are // stored under individual keys at the root (e.g., the full extent of the value // at /qux would be stored under one key.) +// There is support for wildcards in partitions: {/foo/*} will cause /foo/bar/abc +// and /foo/buz/def to be written to separate keys. Multiple wildcards are +// supported (/tenants/*/users/*/bindings), and they can also appear at the end +// of a partition (/users/*). // // All keys written by the disk.Store implementation are prefixed as follows: // @@ -57,32 +61,45 @@ import ( "context" "encoding/json" "fmt" + "path/filepath" + "strings" "sync" "sync/atomic" + "time" badger "github.com/dgraph-io/badger/v3" + "github.com/prometheus/client_golang/prometheus" + + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/storage" "github.com/open-policy-agent/opa/util" ) -// TODO(tsandall): deal w/ slashes in paths -// TODO(tsandall): support multi-level partitioning for use cases like k8s // TODO(tsandall): add support for migrations +// TODO(sr): validate partition patterns properly: if the partitions were {/foo/bar} +// before and the new ones are {/foo/*}, it should be OK. + +// a value log file be rewritten if half the space can be discarded +const valueLogGCDiscardRatio = 0.5 // Options contains parameters that configure the disk-based store. type Options struct { Dir string // specifies directory to store data inside of Partitions []storage.Path // data prefixes that enable efficient layout + Badger string // badger-internal configurables } // Store provides a disk-based implementation of the storage.Store interface. type Store struct { db *badger.DB // underlying key-value store xid uint64 // next transaction id - mu sync.Mutex // synchronizes trigger execution + rmu sync.RWMutex // reader-writer lock + wmu sync.Mutex // writer lock pm *pathMapper // maps logical storage paths to underlying store keys partitions *partitionTrie // data structure to support path mapping triggers map[*handle]struct{} // registered triggers + gcTicker *time.Ticker // gc ticker + close chan struct{} // close-only channel for stopping the GC goroutine } const ( @@ -105,7 +122,7 @@ type metadata struct { } // New returns a new disk-based store based on the provided options. -func New(ctx context.Context, opts Options) (*Store, error) { +func New(ctx context.Context, logger logging.Logger, prom prometheus.Registerer, opts Options) (*Store, error) { partitions := make(pathSet, len(opts.Partitions)) copy(partitions, opts.Partitions) @@ -118,27 +135,73 @@ func New(ctx context.Context, opts Options) (*Store, error) { } } - db, err := badger.Open(badger.DefaultOptions(opts.Dir).WithLogger(nil)) + db, err := badger.Open(badgerConfigFromOptions(opts).WithLogger(&wrap{logger})) if err != nil { return nil, wrapError(err) } + if prom != nil { + if err := initPrometheus(prom); err != nil { + return nil, err + } + } + store := &Store{ db: db, partitions: buildPartitionTrie(partitions), triggers: map[*handle]struct{}{}, + close: make(chan struct{}), + gcTicker: time.NewTicker(time.Minute), } - return store, db.Update(func(txn *badger.Txn) error { + go store.GC(logger) + + if err := db.Update(func(txn *badger.Txn) error { return store.init(ctx, txn, partitions) - }) + }); err != nil { + store.Close(ctx) + return nil, err + } + + return store, store.diagnostics(ctx, partitions, logger) +} + +func (db *Store) GC(logger logging.Logger) { + for { + select { + case <-db.close: + return + case <-db.gcTicker.C: + for err := error(nil); err == nil; err = db.db.RunValueLogGC(valueLogGCDiscardRatio) { + logger.Debug("RunValueLogGC: err=%v", err) + } + } + } } // Close finishes the DB connection and allows other processes to acquire it. func (db *Store) Close(context.Context) error { + db.gcTicker.Stop() return wrapError(db.db.Close()) } +// If the log level is debug, we'll output the badger logs in their corresponding +// log levels; if it's not debug, we'll suppress all badger logs. +type wrap struct { + l logging.Logger +} + +func (w *wrap) debugDo(f func(string, ...interface{}), fmt string, as ...interface{}) { + if w.l.GetLevel() >= logging.Debug { + f("badger: "+fmt, as...) + } +} + +func (w *wrap) Debugf(f string, as ...interface{}) { w.debugDo(w.l.Debug, f, as...) } +func (w *wrap) Infof(f string, as ...interface{}) { w.debugDo(w.l.Info, f, as...) } +func (w *wrap) Warningf(f string, as ...interface{}) { w.debugDo(w.l.Warn, f, as...) } +func (w *wrap) Errorf(f string, as ...interface{}) { w.debugDo(w.l.Error, f, as...) } + // NewTransaction implements the storage.Store interface. func (db *Store) NewTransaction(ctx context.Context, params ...storage.TransactionParams) (storage.Transaction, error) { var write bool @@ -150,6 +213,11 @@ func (db *Store) NewTransaction(ctx context.Context, params ...storage.Transacti } xid := atomic.AddUint64(&db.xid, uint64(1)) + if write { + db.wmu.Lock() // only one concurrent write txn + } else { + db.rmu.RLock() + } underlying := db.db.NewTransaction(write) return newTransaction(xid, write, underlying, context, db.pm, db.partitions, db), nil @@ -162,17 +230,23 @@ func (db *Store) Commit(ctx context.Context, txn storage.Transaction) error { return err } if underlying.write { + db.rmu.Lock() // blocks until all readers are done event, err := underlying.Commit(ctx) if err != nil { return err } - db.mu.Lock() - defer db.mu.Unlock() + write := false // read only txn + readOnly := db.db.NewTransaction(write) + xid := atomic.AddUint64(&db.xid, uint64(1)) + readTxn := newTransaction(xid, write, readOnly, nil, db.pm, db.partitions, db) for h := range db.triggers { - h.cb(ctx, txn, event) + h.cb(ctx, readTxn, event) } - } else { + db.rmu.Unlock() + db.wmu.Unlock() + } else { // committing read txn underlying.Abort(ctx) + db.rmu.RUnlock() } return nil } @@ -184,6 +258,11 @@ func (db *Store) Abort(ctx context.Context, txn storage.Transaction) { panic(err) } underlying.Abort(ctx) + if underlying.write { + db.wmu.Unlock() + } else { + db.rmu.RUnlock() + } } // ListPolicies implements the storage.Policy interface. @@ -238,8 +317,6 @@ func (db *Store) Register(_ context.Context, txn storage.Transaction, config sto } } h := &handle{db: db, cb: config.OnCommit} - db.mu.Lock() - defer db.mu.Unlock() db.triggers[h] = struct{}{} return h, nil } @@ -305,9 +382,7 @@ func (h *handle) Unregister(ctx context.Context, txn storage.Transaction) { Message: "triggers must be unregistered with a write transaction", }) } - h.db.mu.Lock() delete(h.db.triggers, h) - h.db.mu.Unlock() } func (db *Store) loadMetadata(txn *badger.Txn, m *metadata) (bool, error) { @@ -424,3 +499,163 @@ func (db *Store) validatePartitions(ctx context.Context, txn *badger.Txn, existi return nil } + +// MakeDir makes Store a storage.MakeDirer, to avoid the superfluous MakeDir +// steps -- MakeDir is implicit in the disk storage's data layout, since +// {"foo": {"bar": {"baz": 10}}} +// writes value `10` to key `/foo/bar/baz`. +// +// Here, we only check if it's a write transaction, for consistency with +// other implementations, and do nothing. +func (db *Store) MakeDir(_ context.Context, txn storage.Transaction, path storage.Path) error { + underlying, err := db.underlying(txn) + if err != nil { + return err + } + if !underlying.write { + return &storage.Error{ + Code: storage.InvalidTransactionErr, + Message: "MakeDir must be called with a write transaction", + } + } + return nil +} + +// diagnostics prints relevant partition and database related information at +// debug level. +func (db *Store) diagnostics(ctx context.Context, partitions pathSet, logger logging.Logger) error { + if logger.GetLevel() < logging.Debug { + return nil + } + if len(partitions) == 0 { + logger.Warn("no partitions configured") + if err := db.logPrefixStatistics(ctx, storage.MustParsePath("/"), logger); err != nil { + return err + } + } + for _, partition := range partitions { + if err := db.logPrefixStatistics(ctx, partition, logger); err != nil { + return err + } + } + return nil +} + +func (db *Store) logPrefixStatistics(ctx context.Context, partition storage.Path, logger logging.Logger) error { + + if prefix, ok := hasWildcard(partition); ok { + return db.logPrefixStatisticsWildcardPartition(ctx, prefix, partition, logger) + } + + key, err := db.pm.DataPrefix2Key(partition) + if err != nil { + return err + } + + opt := badger.DefaultIteratorOptions + opt.PrefetchValues = false + opt.Prefix = key + + var count, size uint64 + if err := db.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(opt) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + if err := ctx.Err(); err != nil { + return err + } + count++ + size += uint64(it.Item().EstimatedSize()) // key length + value length + } + return nil + }); err != nil { + return err + } + logger.Debug("partition %s: key count: %d (estimated size %d bytes)", partition, count, size) + return nil +} + +func hasWildcard(path storage.Path) (storage.Path, bool) { + for i := range path { + if path[i] == pathWildcard { + return path[:i], true + } + } + return nil, false +} + +func (db *Store) logPrefixStatisticsWildcardPartition(ctx context.Context, prefix, partition storage.Path, logger logging.Logger) error { + // we iterate all keys, and count things according to their concrete partition + type diagInfo struct{ count, size uint64 } + diag := map[string]*diagInfo{} + + key, err := db.pm.DataPrefix2Key(prefix) + if err != nil { + return err + } + + opt := badger.DefaultIteratorOptions + opt.PrefetchValues = false + opt.Prefix = key + if err := db.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(opt) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + if err := ctx.Err(); err != nil { + return err + } + if part, ok := db.prefixInPattern(it.Item().Key(), partition); ok { + p := part.String() + if diag[p] == nil { + diag[p] = &diagInfo{} + } + diag[p].count++ + diag[p].size += uint64(it.Item().EstimatedSize()) // key length + value length + } + } + return nil + }); err != nil { + return err + } + if len(diag) == 0 { + logger.Debug("partition pattern %s: key count: 0 (estimated size 0 bytes)", toString(partition)) + } + for part, diag := range diag { + logger.Debug("partition %s (pattern %s): key count: %d (estimated size %d bytes)", part, toString(partition), diag.count, diag.size) + } + return nil +} + +func (db *Store) prefixInPattern(key []byte, partition storage.Path) (storage.Path, bool) { + var part storage.Path + path, err := db.pm.DataKey2Path(key) + if err != nil { + return nil, false + } + for i := range partition { + if path[i] != partition[i] && partition[i] != pathWildcard { + return nil, false + } + part = append(part, path[i]) + } + return part, true +} + +func toString(path storage.Path) string { + if len(path) == 0 { + return "/" + } + buf := strings.Builder{} + for _, p := range path { + fmt.Fprintf(&buf, "/%s", p) + } + return buf.String() +} + +// dataDir prefixes the configured storage location: what it returns is +// what we have badger write its files to. It is done to give us some +// wiggle room in the future should we need to put further files on the +// file system (like backups): we can then just use the opts.Dir. +func dataDir(dir string) string { + return filepath.Join(dir, "data") +} diff --git a/storage/disk/disk_test.go b/storage/disk/disk_test.go index 4a097fae2e..290878568a 100644 --- a/storage/disk/disk_test.go +++ b/storage/disk/disk_test.go @@ -7,9 +7,13 @@ package disk import ( "bytes" "context" + "reflect" "strings" "testing" + badger "github.com/dgraph-io/badger/v3" + + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/storage" "github.com/open-policy-agent/opa/util" "github.com/open-policy-agent/opa/util/test" @@ -32,11 +36,69 @@ type testWriteError struct { value string } +// testCount lets you assert the number of keys under a prefix. +// Note that we don't do exact matches, so the assertions should be +// as exact as possible: +// testCount{"/foo", 1} +// testCount{"/foo/bar", 1} +// both of these would be true for one element under key `/foo/bar`. +type testCount struct { + key string + count int +} + +func (tc *testCount) assert(t *testing.T, s *Store) { + t.Helper() + key, err := s.pm.DataPath2Key(storage.MustParsePath(tc.key)) + if err != nil { + t.Fatal(err) + } + + opt := badger.DefaultIteratorOptions + opt.PrefetchValues = false + opt.Prefix = key + + var count int + if err := s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(opt) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + count++ + } + return nil + }); err != nil { + t.Fatal(err) + } + + if tc.count != count { + t.Errorf("key %v: expected %d keys, found %d", tc.key, tc.count, count) + } +} + +type testDump struct{} // for debugging purposes + +func (*testDump) do(t *testing.T, s *Store) { + t.Helper() + opt := badger.DefaultIteratorOptions + opt.PrefetchValues = true + + if err := s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(opt) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + t.Logf("%v -> %v", string(it.Item().Key()), it.Item().ValueSize()) + } + return nil + }); err != nil { + t.Fatal(err) + } +} + func TestPolicies(t *testing.T) { test.WithTempFS(map[string]string{}, func(dir string) { ctx := context.Background() - s, err := New(ctx, Options{Dir: dir, Partitions: nil}) + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: nil}) if err != nil { t.Fatal(err) } @@ -101,6 +163,9 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn := func(ctx context.Context, s *Store) { t.Helper() + if s == nil { + return + } if err := s.Close(ctx); err != nil { t.Fatal(err) } @@ -110,7 +175,7 @@ func TestDataPartitioningValidation(t *testing.T) { ctx := context.Background() - _, err := New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + _, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/bar/baz"), }}) @@ -123,7 +188,7 @@ func TestDataPartitioningValidation(t *testing.T) { t.Fatal("unexpected code or message, got:", err) } - s, err := New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/baz"), }}) @@ -133,7 +198,7 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn(ctx, s) - s, err = New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/baz"), storage.MustParsePath("/foo/bar"), }}) @@ -143,7 +208,7 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn(ctx, s) - s, err = New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/baz"), storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/qux"), @@ -164,7 +229,7 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn(ctx, s) - s, err = New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/baz"), storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/qux/corge"), @@ -175,7 +240,7 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn(ctx, s) - s, err = New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/baz"), storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/qux"), @@ -187,7 +252,7 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn(ctx, s) - s, err = New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/baz"), storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/qux"), @@ -199,7 +264,7 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn(ctx, s) - s, err = New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/baz"), storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/qux"), @@ -233,6 +298,7 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { path: "/foo/bar", exp: `"x"`, }, + testCount{"/foo/bar", 1}, }, }, { @@ -248,6 +314,7 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { path: "/foo/bar/baz", exp: `"x"`, }, + testCount{"/foo/bar/baz", 1}, }, }, { @@ -263,6 +330,8 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { path: "/deadbeef", exp: `"x"`, }, + testCount{"/foo", 0}, + testCount{"/deadbeef", 1}, }, }, { @@ -302,6 +371,8 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { path: "/foo/bar/baz", exp: `8`, }, + testCount{"/foo/bar", 1}, + testCount{"/foo/bar/baz", 0}, }, }, { @@ -365,7 +436,27 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { }, }, { - note: "read-modify-write: add: array overwrite", + note: "read-modify-write: add: array append (via last index)", + partitions: []string{"/foo"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/foo/bar", + value: `[1]`, + }, + testWrite{ + op: storage.AddOp, + path: "/foo/bar/1", + value: `8`, + }, + testRead{ + path: "/foo/bar", + exp: `[1, 8]`, + }, + }, + }, + { + note: "read-modify-write: add: array insert", partitions: []string{"/foo"}, sequence: []interface{}{ testWrite{ @@ -380,7 +471,7 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { }, testRead{ path: "/foo/bar", - exp: `[8]`, + exp: `[8, 7]`, }, }, }, @@ -404,6 +495,26 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { }, }, }, + { + note: "read-modify-write: replace: array", + partitions: []string{"/foo"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/foo/bar", + value: `[7]`, + }, + testWrite{ + op: storage.ReplaceOp, + path: "/foo/bar/0", + value: `8`, + }, + testRead{ + path: "/foo/bar", + exp: `[8]`, + }, + }, + }, { note: "read-modify-write: remove", partitions: []string{"/foo"}, @@ -423,6 +534,25 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { }, }, }, + { + note: "read-modify-write: remove: array", + partitions: []string{"/foo"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/foo/bar", + value: `[7, 8]`, + }, + testWrite{ + op: storage.RemoveOp, + path: "/foo/bar/0", + }, + testRead{ + path: "/foo/bar", + exp: `[8]`, + }, + }, + }, { note: "read-modify-write: multi-level: map", partitions: []string{"/foo"}, @@ -472,6 +602,8 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { path: "/foo", value: `{"bar": 7, "baz": 8}`, }, + + testCount{"/foo", 2}, testRead{ path: "/foo/bar", exp: `7`, @@ -592,6 +724,87 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { }, }, }, + { + note: "pattern partitions: middle wildcard: match", + partitions: []string{"/foo/*/bar"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/foo/a/bar", + value: `{"baz": 7}`, + }, + testCount{"/foo/a/bar/baz", 1}, + testRead{"/foo/a/bar/baz", `7`}, + }, + }, + { + note: "pattern partitions: middle wildcard: no-match", + partitions: []string{"/foo/*/bar"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/foo/b/baz", + value: `{"quz": 1}`, + }, + testCount{"/foo/b/baz", 1}, + testCount{"/foo/b/baz/quz", 0}, + testRead{"/foo/b/baz/quz", `1`}, + }, + }, + { + note: "pattern partitions: middle wildcard: partial match", + partitions: []string{"/foo/*/bar"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/foo/b", + value: `{"bar": {"quz": 1}, "x": "y"}`, + }, + testCount{"/foo/b/bar/quz", 1}, + testRead{"/foo/b/bar/quz", `1`}, + testCount{"/foo/b/x", 1}, + testRead{"/foo/b/x", `"y"`}, + }, + }, + { + note: "pattern partitions: 2x middle wildcard: partial match", + partitions: []string{"/foo/*/*/bar"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/foo/b/c", + value: `{"bar": {"quz": 1}, "x": "y"}`, + }, + testCount{"/foo/b/c/bar/quz", 1}, + testRead{"/foo/b/c/bar/quz", `1`}, + testCount{"/foo/b/c/x", 1}, + testRead{"/foo/b/c/x", `"y"`}, + }, + }, + { + note: "pattern partitions: wildcard at the end", + partitions: []string{"/users/*"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/users", + value: `{"alice": {"bar": {"quz": 1}, "x": "y"}}`, + }, + testWrite{ + op: storage.AddOp, + path: "/users/bob", + value: `{"baz": {"one": 1}, "y": "x"}`, + }, + testCount{"/users/alice/bar", 1}, + testRead{"/users/alice/bar/quz", `1`}, + testCount{"/users/alice/x", 1}, + testRead{"/users/alice/x", `"y"`}, + testCount{"/users/bob/baz", 1}, + testRead{"/users/bob/baz/one", `1`}, + testCount{"/users/bob/y", 1}, + testRead{"/users/bob/y", `"x"`}, + }, + }, } for _, tc := range tests { @@ -604,7 +817,7 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { } ctx := context.Background() - s, err := New(ctx, Options{Dir: dir, Partitions: partitions}) + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: partitions}) if err != nil { t.Fatal(err) } @@ -612,6 +825,8 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { for _, x := range tc.sequence { switch x := x.(type) { + case testCount: + x.assert(t, s) case testWrite: executeTestWrite(ctx, t, s, x) case testRead: @@ -626,6 +841,8 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { if cmp := util.Compare(result, exp); cmp != 0 { t.Fatalf("expected %v but got %v", x.exp, result) } + case testDump: + x.do(t, s) default: panic("unexpected type") } @@ -752,7 +969,7 @@ func TestDataPartitioningReadNotFoundErrors(t *testing.T) { } ctx := context.Background() - s, err := New(ctx, Options{Dir: dir, Partitions: partitions}) + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: partitions}) if err != nil { t.Fatal(err) } @@ -786,60 +1003,6 @@ func TestDataPartitioningWriteNotFoundErrors(t *testing.T) { partitions []string sequence []interface{} }{ - { - note: "unpartitioned", - partitions: []string{"/foo"}, - sequence: []interface{}{ - testWriteError{ - op: storage.AddOp, - path: "/does/notexist", - value: `7`, - }, - }, - }, - { - note: "unpartitioned: nested", - partitions: []string{"/foo"}, - sequence: []interface{}{ - testWrite{ - op: storage.AddOp, - path: "/deadbeef", - value: `{}`, - }, - testWriteError{ - op: storage.AddOp, - path: "/deadbeef/x/y", - value: `{}`, - }, - }, - }, - { - note: "partitioned: nested", - partitions: []string{"/foo"}, - sequence: []interface{}{ - testWriteError{ - op: storage.AddOp, - path: "/foo/bar/baz", - value: `{}`, - }, - }, - }, - { - note: "partitioned: nested: 2-level", - partitions: []string{"/foo"}, - sequence: []interface{}{ - testWrite{ - op: storage.AddOp, - path: "/foo/bar", - value: `{}`, - }, - testWriteError{ - op: storage.AddOp, - path: "/foo/bar/baz/qux", - value: `7`, - }, - }, - }, { note: "patch: remove: non-existent key", partitions: []string{}, @@ -958,7 +1121,7 @@ func TestDataPartitioningWriteNotFoundErrors(t *testing.T) { } ctx := context.Background() - s, err := New(ctx, Options{Dir: dir, Partitions: partitions}) + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: partitions}) if err != nil { t.Fatal(err) @@ -994,7 +1157,7 @@ func TestDataPartitioningWriteNotFoundErrors(t *testing.T) { func TestDataPartitioningWriteInvalidPatchError(t *testing.T) { test.WithTempFS(map[string]string{}, func(dir string) { ctx := context.Background() - s, err := New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo"), }}) if err != nil { @@ -1042,3 +1205,274 @@ func executeTestWrite(ctx context.Context, t *testing.T, s storage.Store, x test t.Fatal(err) } } + +func TestDiskTriggers(t *testing.T) { + test.WithTempFS(map[string]string{}, func(dir string) { + ctx := context.Background() + store, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ + storage.MustParsePath("/foo"), + }}) + if err != nil { + t.Fatal(err) + } + defer store.Close(ctx) + writeTxn := storage.NewTransactionOrDie(ctx, store, storage.WriteParams) + readTxn := storage.NewTransactionOrDie(ctx, store) + + _, err = store.Register(ctx, readTxn, storage.TriggerConfig{ + OnCommit: func(context.Context, storage.Transaction, storage.TriggerEvent) {}, + }) + + if err == nil || !storage.IsInvalidTransaction(err) { + t.Fatalf("Expected transaction error: %v", err) + } + + store.Abort(ctx, readTxn) + + var event storage.TriggerEvent + modifiedPath := storage.MustParsePath("/a") + expectedValue := "hello" + + _, err = store.Register(ctx, writeTxn, storage.TriggerConfig{ + OnCommit: func(ctx context.Context, txn storage.Transaction, evt storage.TriggerEvent) { + result, err := store.Read(ctx, txn, modifiedPath) + if err != nil || !reflect.DeepEqual(result, expectedValue) { + t.Fatalf("Expected result to be hello for trigger read but got: %v (err: %v)", result, err) + } + event = evt + }, + }) + if err != nil { + t.Fatalf("Failed to register callback: %v", err) + } + + if err := store.Write(ctx, writeTxn, storage.ReplaceOp, modifiedPath, expectedValue); err != nil { + t.Fatalf("Unexpected write error: %v", err) + } + + id := "test" + data := []byte("package abc") + if err := store.UpsertPolicy(ctx, writeTxn, id, data); err != nil { + t.Fatalf("Unexpected upsert error: %v", err) + } + + if err := store.Commit(ctx, writeTxn); err != nil { + t.Fatalf("Unexpected commit error: %v", err) + } + + if event.IsZero() || !event.PolicyChanged() || !event.DataChanged() { + t.Fatalf("Expected policy and data change but got: %v", event) + } + + expData := storage.DataEvent{Path: modifiedPath, Data: expectedValue, Removed: false} + if d := event.Data[0]; !reflect.DeepEqual(expData, d) { + t.Fatalf("Expected data event %v, got %v", expData, d) + } + + expPolicy := storage.PolicyEvent{ID: id, Data: data, Removed: false} + if p := event.Policy[0]; !reflect.DeepEqual(expPolicy, p) { + t.Fatalf("Expected policy event %v, got %v", expPolicy, p) + } + }) +} + +func TestDiskDiagnostics(t *testing.T) { + ctx := context.Background() + + t.Run("no partitions", func(t *testing.T) { + test.WithTempFS(nil, func(dir string) { + buf := bytes.Buffer{} + logger := logging.New() + logger.SetOutput(&buf) + logger.SetLevel(logging.Debug) + store, err := New(ctx, logger, nil, Options{Dir: dir}) + if err != nil { + t.Fatal(err) + } + // store something, won't show up in the logs yet (they're calculated on startup only) + if err := storage.WriteOne(ctx, store, storage.AddOp, storage.MustParsePath("/foo"), util.MustUnmarshalJSON([]byte(`{"baz": 1000}`))); err != nil { + t.Fatal(err) + } + if err := storage.WriteOne(ctx, store, storage.AddOp, storage.MustParsePath("/bar"), util.MustUnmarshalJSON([]byte(`{"quz": 2000}`))); err != nil { + t.Fatal(err) + } + if err := store.Close(ctx); err != nil { + t.Fatal(err) + } + + expected := []string{ + `level=error msg="no partitions configured"`, + `level=debug msg="partition /: key count: 0 (estimated size 0 bytes)"`, + } + for _, exp := range expected { + if !strings.Contains(buf.String(), exp) { + t.Errorf("expected string %q not found in logs", exp) + } + } + if t.Failed() { + t.Log("log oputput: ", buf.String()) + } + + // re-open + buf = bytes.Buffer{} + logger = logging.New() + logger.SetOutput(&buf) + logger.SetLevel(logging.Debug) + store, err = New(ctx, logger, nil, Options{Dir: dir}) + if err != nil { + t.Fatal(err) + } + if err := store.Close(ctx); err != nil { + t.Fatal(err) + } + + expected = []string{ + `level=debug msg="partition /: key count: 2 (estimated size 50 bytes)"`, + } + for _, exp := range expected { + if !strings.Contains(buf.String(), exp) { + t.Errorf("expected string %q not found in logs", exp) + } + } + if t.Failed() { + t.Log("log oputput: ", buf.String()) + } + }) + }) + + t.Run("two partitions", func(t *testing.T) { + test.WithTempFS(nil, func(dir string) { + opts := Options{ + Dir: dir, + Partitions: []storage.Path{ + storage.MustParsePath("/foo"), + storage.MustParsePath("/bar"), + }} + buf := bytes.Buffer{} + logger := logging.New() + logger.SetOutput(&buf) + logger.SetLevel(logging.Debug) + store, err := New(ctx, logger, nil, opts) + if err != nil { + t.Fatal(err) + } + + // store something, won't show up in the logs yet (they're calculated on startup only) + if err := storage.WriteOne(ctx, store, storage.AddOp, storage.MustParsePath("/foo"), util.MustUnmarshalJSON([]byte(`{"baz": 1000}`))); err != nil { + t.Fatal(err) + } + + if err := store.Close(ctx); err != nil { + t.Fatal(err) + } + + expected := []string{ + `level=debug msg="partition /bar: key count: 0 (estimated size 0 bytes)"`, + `level=debug msg="partition /foo: key count: 0 (estimated size 0 bytes)"`, + } + for _, exp := range expected { + if !strings.Contains(buf.String(), exp) { + t.Errorf("expected string %q not found in logs", exp) + } + } + if t.Failed() { + t.Log("log oputput: ", buf.String()) + } + + // re-open + buf = bytes.Buffer{} + logger = logging.New() + logger.SetOutput(&buf) + logger.SetLevel(logging.Debug) + store, err = New(ctx, logger, nil, opts) + if err != nil { + t.Fatal(err) + } + if err := store.Close(ctx); err != nil { + t.Fatal(err) + } + + expected = []string{ + `level=debug msg="partition /bar: key count: 0 (estimated size 0 bytes)"`, + `level=debug msg="partition /foo: key count: 1 (estimated size 21 bytes)"`, + } + for _, exp := range expected { + if !strings.Contains(buf.String(), exp) { + t.Errorf("expected string %q not found in logs", exp) + } + } + if t.Failed() { + t.Log("log oputput: ", buf.String()) + } + }) + }) + + t.Run("patterned partitions", func(t *testing.T) { + test.WithTempFS(nil, func(dir string) { + opts := Options{Dir: dir, + Partitions: []storage.Path{ + storage.MustParsePath("/foo/*/bar"), + storage.MustParsePath("/bar"), + }} + buf := bytes.Buffer{} + logger := logging.New() + logger.SetOutput(&buf) + logger.SetLevel(logging.Debug) + store, err := New(ctx, logger, nil, opts) + if err != nil { + t.Fatal(err) + } + + // store something, won't show up in the logs yet (they're calculated on startup only) + if err := storage.WriteOne(ctx, store, storage.AddOp, storage.MustParsePath("/foo/x/bar"), util.MustUnmarshalJSON([]byte(`{"baz": 1000}`))); err != nil { + t.Fatal(err) + } + if err := storage.WriteOne(ctx, store, storage.AddOp, storage.MustParsePath("/bar"), util.MustUnmarshalJSON([]byte(`{"quz": 1000}`))); err != nil { + t.Fatal(err) + } + + if err := store.Close(ctx); err != nil { + t.Fatal(err) + } + + expected := []string{ + `level=debug msg="partition /bar: key count: 0 (estimated size 0 bytes)"`, + `level=debug msg="partition pattern /foo/*/bar: key count: 0 (estimated size 0 bytes)"`, + } + for _, exp := range expected { + if !strings.Contains(buf.String(), exp) { + t.Errorf("expected string %q not found in logs", exp) + } + } + if t.Failed() { + t.Log("log oputput: ", buf.String()) + } + + // re-open + buf = bytes.Buffer{} + logger = logging.New() + logger.SetOutput(&buf) + logger.SetLevel(logging.Debug) + store, err = New(ctx, logger, nil, opts) + if err != nil { + t.Fatal(err) + } + if err := store.Close(ctx); err != nil { + t.Fatal(err) + } + + expected = []string{ + `level=debug msg="partition /bar: key count: 1 (estimated size 21 bytes)"`, + `level=debug msg="partition /foo/x/bar (pattern /foo/*/bar): key count: 1 (estimated size 27 bytes)"`, + } + for _, exp := range expected { + if !strings.Contains(buf.String(), exp) { + t.Errorf("expected string %q not found in logs", exp) + } + } + if t.Failed() { + t.Log("log oputput: ", buf.String()) + } + }) + }) +} diff --git a/storage/disk/example_test.go b/storage/disk/example_test.go index 88fefbab13..cd3871ff8d 100644 --- a/storage/disk/example_test.go +++ b/storage/disk/example_test.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "os" + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/storage" "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/util" @@ -33,7 +34,7 @@ func Example_store() { defer os.RemoveAll(dir) // Create a new disk-based store. - store, err := disk.New(ctx, disk.Options{ + store, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/authz/tenants"), @@ -63,7 +64,7 @@ func Example_store() { check(err) // Re-create the disk-based store using the same options. - store2, err := disk.New(ctx, disk.Options{ + store2, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/authz/tenants"), diff --git a/storage/disk/metrics.go b/storage/disk/metrics.go new file mode 100644 index 0000000000..f7e53eba57 --- /dev/null +++ b/storage/disk/metrics.go @@ -0,0 +1,51 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package disk + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + // storage read transactions never delete or write + keysReadPerStoreRead = newHist("keys_read_per_store_read_txn", "How many database reads had to occur for a storage read transaction") + bytesReadPerStoreRead = newHist("key_bytes_read_per_store_read_txn", "How many bytes of data were read for a storage read transaction") + + keysReadPerStoreWrite = newHist("keys_read_per_store_write_txn", "How many database reads had to occur for a storage write transaction") + keysWrittenPerStoreWrite = newHist("keys_written_per_store_write_txn", "How many database writes had to occur for a storage write transaction") + keysDeletedPerStoreWrite = newHist("keys_deleted_per_store_write_txn", "How many database writes had to occur for a storage write transaction") + bytesReadPerStoreWrite = newHist("key_bytes_read_per_store_write_txn", "How many bytes of data were read for a storage write transaction") +) + +func initPrometheus(reg prometheus.Registerer) error { + for _, hist := range []prometheus.Histogram{ + keysReadPerStoreRead, + bytesReadPerStoreRead, + keysReadPerStoreWrite, + keysWrittenPerStoreWrite, + keysDeletedPerStoreWrite, + bytesReadPerStoreWrite, + } { + if err := reg.Register(hist); err != nil { + return err + } + } + return nil +} + +func newHist(name, desc string) prometheus.Histogram { + return prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: name, + Help: desc, + Buckets: prometheus.LinearBuckets(1, 1, 10), // TODO different buckets? exp? + }) +} + +func forwardMetric(m map[string]interface{}, counter string, hist prometheus.Histogram) { + key := "counter_" + counter + if s, ok := m[key]; ok { + hist.Observe(float64(s.(uint64))) + } +} diff --git a/storage/disk/partition.go b/storage/disk/partition.go index adee4fe153..a10fc96405 100644 --- a/storage/disk/partition.go +++ b/storage/disk/partition.go @@ -29,7 +29,12 @@ func newPartitionTrie() *partitionTrie { func (p *partitionTrie) Find(path storage.Path) (int, *partitionTrie) { node := p for i, x := range path { - next, ok := node.partitions[x] + next, ok := node.partitions[pathWildcard] + if ok { + node = next + continue + } + next, ok = node.partitions[x] if !ok { return i + 1, nil } diff --git a/storage/disk/partition_test.go b/storage/disk/partition_test.go index 506b40533b..29fc830ca8 100644 --- a/storage/disk/partition_test.go +++ b/storage/disk/partition_test.go @@ -18,11 +18,13 @@ func TestPartitionTrie(t *testing.T) { storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/baz/qux"), storage.MustParsePath("/corge"), + storage.MustParsePath("/tenants/*/bindings"), // wildcard in the middle + storage.MustParsePath("/users/*"), // wildcard at the end }) // Assert on counts... - if len(root.partitions) != 2 { - t.Fatal("expected root to contain two partitions") + if exp, act := 4, len(root.partitions); exp != act { + t.Fatalf("expected root to contain %d partitions, got %d", exp, act) } if len(root.partitions["foo"].partitions) != 2 { @@ -82,6 +84,22 @@ func TestPartitionTrie(t *testing.T) { path: "/deadbeef", wantIdx: 1, wantPtr: nil, + }, { + path: "/tenants/deadbeef/bindings/user01", + wantIdx: 4, + wantPtr: nil, + }, { + path: "/tenants/deadbeef/bindings", + wantIdx: 3, + wantPtr: root.partitions["tenants"].partitions["*"].partitions["bindings"], + }, { + path: "/tenants/deadbeef/foo", + wantIdx: 3, + wantPtr: nil, + }, { + path: "/users/deadbeef", + wantIdx: 2, + wantPtr: root.partitions["users"].partitions["*"], }, } diff --git a/storage/disk/paths.go b/storage/disk/paths.go index c052421e2b..04864087b5 100644 --- a/storage/disk/paths.go +++ b/storage/disk/paths.go @@ -11,6 +11,8 @@ import ( "github.com/open-policy-agent/opa/storage" ) +const pathWildcard = "*" + type pathMapper struct { dataPrefix string dataPrefixNoTrailingSlash string @@ -38,7 +40,7 @@ func (pm *pathMapper) PolicyID2Key(id string) []byte { } func (pm *pathMapper) DataKey2Path(key []byte) (storage.Path, error) { - p, ok := storage.ParsePath(string(key)) + p, ok := storage.ParsePathEscaped(string(key)) if !ok { return nil, &storage.Error{Code: storage.InternalErr, Message: fmt.Sprintf("corrupt key: %s", key)} } @@ -66,7 +68,7 @@ func (ps pathSet) IsDisjoint() bool { for i := range ps { for j := range ps { if i != j { - if ps[i].HasPrefix(ps[j]) || ps[j].HasPrefix(ps[i]) { + if hasPrefixWithWildcard(ps[i], ps[j]) { return false } } @@ -75,6 +77,23 @@ func (ps pathSet) IsDisjoint() bool { return true } +// hasPrefixWithWildcard returns true if p starts with other; respecting +// wildcards +func hasPrefixWithWildcard(p, other storage.Path) bool { + if len(other) > len(p) { + return false + } + for i := range other { + if p[i] == pathWildcard || other[i] == pathWildcard { + continue + } + if p[i] != other[i] { + return false + } + } + return true +} + func (ps pathSet) Diff(other pathSet) pathSet { diff := pathSet{} for _, x := range ps { diff --git a/storage/disk/paths_test.go b/storage/disk/paths_test.go new file mode 100644 index 0000000000..92993a2d89 --- /dev/null +++ b/storage/disk/paths_test.go @@ -0,0 +1,59 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package disk + +import ( + "testing" + + "github.com/open-policy-agent/opa/storage" +) + +func TestIsDisjoint(t *testing.T) { + paths := func(ps ...string) pathSet { + ret := make([]storage.Path, len(ps)) + for i := range ps { + ret[i] = storage.MustParsePath(ps[i]) + } + return ret + } + + for _, tc := range []struct { + note string + ps pathSet + overlapped bool + }{ + { + note: "simple disjoint", + ps: paths("/foo", "/bar", "/baz"), + }, + { + note: "simple overlapping", + ps: paths("/foo", "/foo/bar"), + overlapped: true, + }, + { + note: "three overlapping", + ps: paths("/fox", "/foo/bar", "/foo"), + overlapped: true, + }, + { + note: "wildcard overlapping, last", + ps: paths("/foo", "/foo/*"), + overlapped: true, + }, + { + note: "wildcard overlapping, middle", + ps: paths("/foo/bar/baz", "/foo/*/baz"), + overlapped: true, + }, + } { + t.Run(tc.note, func(t *testing.T) { + act := tc.ps.IsDisjoint() + if !tc.overlapped != act { + t.Errorf("path set: %v, disjoint == %v, expected %v", tc.ps, act, !tc.overlapped) + } + }) + } +} diff --git a/storage/disk/txn.go b/storage/disk/txn.go index cd62379841..f51f74763b 100644 --- a/storage/disk/txn.go +++ b/storage/disk/txn.go @@ -9,27 +9,52 @@ import ( "context" "encoding/json" "fmt" + "strconv" badger "github.com/dgraph-io/badger/v3" + "github.com/open-policy-agent/opa/metrics" "github.com/open-policy-agent/opa/storage" "github.com/open-policy-agent/opa/storage/internal/errors" "github.com/open-policy-agent/opa/storage/internal/ptr" "github.com/open-policy-agent/opa/util" ) +const ( + readValueBytesCounter = "disk_read_bytes" + readKeysCounter = "disk_read_keys" + writtenKeysCounter = "disk_written_keys" + deletedKeysCounter = "disk_deleted_keys" + + commitTimer = "disk_commit" + readTimer = "disk_read" + writeTimer = "disk_write" +) + type transaction struct { - underlying *badger.Txn // handle for the underlying badger transaction - partitions *partitionTrie // index for partitioning structure in underlying store - pm *pathMapper // used for mapping between logical storage paths and actual storage keys - db *Store // handle for the database this transaction was created on - xid uint64 // unique id for this transaction - stale bool // bit to indicate if the transaction was already aborted/committed - write bool // bit to indicate if the transaction may perform writes - context *storage.Context // context supplied by the caller to be included in triggers + underlying *badger.Txn // handle for the underlying badger transaction + partitions *partitionTrie // index for partitioning structure in underlying store + pm *pathMapper // used for mapping between logical storage paths and actual storage keys + db *Store // handle for the database this transaction was created on + xid uint64 // unique id for this transaction + stale bool // bit to indicate if the transaction was already aborted/committed + write bool // bit to indicate if the transaction may perform writes + event storage.TriggerEvent // constructed as we go, supplied by the caller to be included in triggers + metrics metrics.Metrics // per-transaction metrics } func newTransaction(xid uint64, write bool, underlying *badger.Txn, context *storage.Context, pm *pathMapper, trie *partitionTrie, db *Store) *transaction { + + // Even if the caller is not interested, these will contribute + // to the prometheus metrics on commit. + var m metrics.Metrics + if context != nil { + m = context.Metrics() + } + if m == nil { + m = metrics.New() + } + return &transaction{ underlying: underlying, partitions: trie, @@ -38,7 +63,10 @@ func newTransaction(xid uint64, write bool, underlying *badger.Txn, context *sto xid: xid, stale: false, write: write, - context: context, + event: storage.TriggerEvent{ + Context: context, + }, + metrics: m, } } @@ -46,16 +74,31 @@ func (txn *transaction) ID() uint64 { return txn.xid } +// Commit will commit the underlying transaction, and forward the per-transaction +// metrics into prometheus metrics. +// NOTE(sr): aborted transactions are not measured func (txn *transaction) Commit(context.Context) (storage.TriggerEvent, error) { - // TODO(tsandall): This implementation does not provide any data or policy - // events because they are of minimal value given that the transaction - // cannot be used for reads once the commit finishes. This differs from the - // in-memory store. - // - // We should figure out how to remove the code in the plugin manager that - // performs reads on committed transactions. txn.stale = true - return storage.TriggerEvent{Context: txn.context}, wrapError(txn.underlying.Commit()) + txn.metrics.Timer(commitTimer).Start() + err := wrapError(txn.underlying.Commit()) + txn.metrics.Timer(commitTimer).Stop() + + if err != nil { + return txn.event, err + } + + m := txn.metrics.All() + if txn.write { + forwardMetric(m, readKeysCounter, keysReadPerStoreWrite) + forwardMetric(m, readKeysCounter, keysReadPerStoreWrite) + forwardMetric(m, writtenKeysCounter, keysWrittenPerStoreWrite) + forwardMetric(m, deletedKeysCounter, keysDeletedPerStoreWrite) + forwardMetric(m, readValueBytesCounter, bytesReadPerStoreWrite) + } else { + forwardMetric(m, readKeysCounter, keysReadPerStoreRead) + forwardMetric(m, readValueBytesCounter, bytesReadPerStoreRead) + } + return txn.event, nil } func (txn *transaction) Abort(context.Context) { @@ -63,12 +106,13 @@ func (txn *transaction) Abort(context.Context) { txn.underlying.Discard() } -func (txn *transaction) Read(_ context.Context, path storage.Path) (interface{}, error) { +func (txn *transaction) Read(ctx context.Context, path storage.Path) (interface{}, error) { + txn.metrics.Timer(readTimer).Start() + defer txn.metrics.Timer(readTimer).Stop() i, node := txn.partitions.Find(path) if node == nil { - key, err := txn.pm.DataPath2Key(path[:i]) if err != nil { return nil, err @@ -87,10 +131,10 @@ func (txn *transaction) Read(_ context.Context, path storage.Path) (interface{}, return nil, err } - return txn.readMultiple(i, key) + return txn.readMultiple(ctx, i, key) } -func (txn *transaction) readMultiple(offset int, prefix []byte) (interface{}, error) { +func (txn *transaction) readMultiple(ctx context.Context, offset int, prefix []byte) (interface{}, error) { result := map[string]interface{}{} @@ -98,8 +142,14 @@ func (txn *transaction) readMultiple(offset int, prefix []byte) (interface{}, er defer it.Close() var keybuf, valbuf []byte + var count uint64 for it.Rewind(); it.Valid(); it.Next() { + if ctx.Err() != nil { + return nil, ctx.Err() + } + + count++ keybuf = it.Item().KeyCopy(keybuf) path, err := txn.pm.DataKey2Path(keybuf) @@ -111,6 +161,7 @@ func (txn *transaction) readMultiple(offset int, prefix []byte) (interface{}, er if err != nil { return nil, wrapError(err) } + txn.metrics.Counter(readValueBytesCounter).Add(uint64(len(valbuf))) var value interface{} if err := deserialize(valbuf, &value); err != nil { @@ -135,6 +186,8 @@ func (txn *transaction) readMultiple(offset int, prefix []byte) (interface{}, er node[path[len(path)-1]] = value } + txn.metrics.Counter(readKeysCounter).Add(count) + if len(result) == 0 { return nil, errNotFound } @@ -143,6 +196,7 @@ func (txn *transaction) readMultiple(offset int, prefix []byte) (interface{}, er } func (txn *transaction) readOne(key []byte) (interface{}, error) { + txn.metrics.Counter(readKeysCounter).Add(1) item, err := txn.underlying.Get(key) if err != nil { @@ -155,6 +209,7 @@ func (txn *transaction) readOne(key []byte) (interface{}, error) { var val interface{} err = item.Value(func(bs []byte) error { + txn.metrics.Counter(readValueBytesCounter).Add(uint64(len(bs))) return deserialize(bs, &val) }) @@ -164,66 +219,49 @@ func (txn *transaction) readOne(key []byte) (interface{}, error) { type update struct { key []byte value []byte + data interface{} delete bool } -// errTxnTooBigErrorHandler checks if the passed error was caused by a transaction -// that was _too big_. If so, it attempts to commit the transaction and opens a new one. -// See https://dgraph.io/docs/badger/get-started/#read-write-transactions -func errTxnTooBigErrorHandler(txn *transaction, err error) error { - errSetCommit := txn.underlying.Commit() - if errSetCommit != nil { - return wrapError(errSetCommit) - } - txn.underlying = txn.db.db.NewTransaction(true) - return nil -} - -// !!!  infinite recursion only if infinite txn too big error occurred -func deleteWithErrTxnTooBigErrorHandling(txn *transaction, u *update) error { - err := txn.underlying.Delete(u.key) - if err == badger.ErrTxnTooBig { - if txnErr := errTxnTooBigErrorHandler(txn, err); txnErr != nil { - return txnErr - } - return deleteWithErrTxnTooBigErrorHandling(txn, u) - } - return wrapError(err) -} - -// !!! infinite recursion only if infinite txn too big error occurred -func setWithErrTxnTooBigErrorHandling(txn *transaction, u *update) error { - err := txn.underlying.Set(u.key, u.value) - if err == badger.ErrTxnTooBig { - if txnErr := errTxnTooBigErrorHandler(txn, err); txnErr != nil { - return txnErr - } - return setWithErrTxnTooBigErrorHandling(txn, u) - } - return wrapError(err) -} - func (txn *transaction) Write(_ context.Context, op storage.PatchOp, path storage.Path, value interface{}) error { + txn.metrics.Timer(writeTimer).Start() + defer txn.metrics.Timer(writeTimer).Stop() + updates, err := txn.partitionWrite(op, path, value) if err != nil { return err } for _, u := range updates { if u.delete { - if err := deleteWithErrTxnTooBigErrorHandling(txn, &u); err != nil { + if err := txn.underlying.Delete(u.key); err != nil { return err } + txn.metrics.Counter(deletedKeysCounter).Add(1) } else { - if err := setWithErrTxnTooBigErrorHandling(txn, &u); err != nil { + if err := txn.underlying.Set(u.key, u.value); err != nil { return err } + txn.metrics.Counter(writtenKeysCounter).Add(1) } + + txn.event.Data = append(txn.event.Data, storage.DataEvent{ + Path: path, // ? + Data: u.data, // nil if delete == true + Removed: u.delete, + }) } return nil } func (txn *transaction) partitionWrite(op storage.PatchOp, path storage.Path, value interface{}) ([]update, error) { + if op == storage.RemoveOp && len(path) == 0 { + return nil, &storage.Error{ + Code: storage.InvalidPatchErr, + Message: "root cannot be removed", + } + } + i, node := txn.partitions.Find(path) if node == nil { @@ -241,11 +279,11 @@ func (txn *transaction) partitionWrite(op storage.PatchOp, path storage.Path, va } curr, err := txn.readOne(key) - if err != nil { + if err != nil && err != errNotFound { return nil, err } - modified, err := patch(curr, op, path[i:], value) + modified, err := patch(curr, op, path, i, value) if err != nil { return nil, err } @@ -254,8 +292,7 @@ func (txn *transaction) partitionWrite(op storage.PatchOp, path storage.Path, va if err != nil { return nil, err } - - return []update{{key: key, value: bs}}, nil + return []update{{key: key, value: bs, data: modified}}, nil } key, err := txn.pm.DataPrefix2Key(path) @@ -270,6 +307,7 @@ func (txn *transaction) partitionWrite(op storage.PatchOp, path storage.Path, va for it.Rewind(); it.Valid(); it.Next() { updates = append(updates, update{key: it.Item().KeyCopy(nil), delete: true}) + txn.metrics.Counter(readKeysCounter).Add(1) } if op == storage.RemoveOp { @@ -291,23 +329,27 @@ func (txn *transaction) partitionWriteMultiple(node *partitionTrie, path storage for k, v := range obj { child := append(path, k) next, ok := node.partitions[k] - if !ok { - key, err := txn.pm.DataPath2Key(child) - if err != nil { - return nil, err - } - bs, err := serialize(v) - if err != nil { - return nil, err - } - result = append(result, update{key: key, value: bs}) - } else { + if !ok { // try wildcard + next, ok = node.partitions[pathWildcard] + } + if ok { var err error result, err = txn.partitionWriteMultiple(next, child, v, result) if err != nil { return nil, err } + continue + } + + key, err := txn.pm.DataPath2Key(child) + if err != nil { + return nil, err + } + bs, err := serialize(v) + if err != nil { + return nil, err } + result = append(result, update{key: key, value: bs, data: v}) } return result, nil @@ -328,10 +370,10 @@ func (txn *transaction) partitionWriteOne(op storage.PatchOp, path storage.Path, return nil, err } - return []update{{key: key, value: val}}, nil + return []update{{key: key, value: val, data: value}}, nil } -func (txn *transaction) ListPolicies(context.Context) ([]string, error) { +func (txn *transaction) ListPolicies(ctx context.Context) ([]string, error) { var result []string @@ -344,6 +386,10 @@ func (txn *transaction) ListPolicies(context.Context) ([]string, error) { var key []byte for it.Rewind(); it.Valid(); it.Next() { + if ctx.Err() != nil { + return nil, ctx.Err() + } + txn.metrics.Counter(readKeysCounter).Add(1) item := it.Item() key = item.KeyCopy(key) result = append(result, txn.pm.PolicyKey2ID(key)) @@ -353,20 +399,41 @@ func (txn *transaction) ListPolicies(context.Context) ([]string, error) { } func (txn *transaction) GetPolicy(_ context.Context, id string) ([]byte, error) { + txn.metrics.Counter(readKeysCounter).Add(1) item, err := txn.underlying.Get(txn.pm.PolicyID2Key(id)) if err != nil { + if err == badger.ErrKeyNotFound { + return nil, errors.NewNotFoundErrorf("policy id %q", id) + } return nil, err } bs, err := item.ValueCopy(nil) + txn.metrics.Counter(readValueBytesCounter).Add(uint64(len(bs))) return bs, wrapError(err) } func (txn *transaction) UpsertPolicy(_ context.Context, id string, bs []byte) error { - return wrapError(txn.underlying.Set(txn.pm.PolicyID2Key(id), bs)) + if err := txn.underlying.Set(txn.pm.PolicyID2Key(id), bs); err != nil { + return wrapError(err) + } + txn.metrics.Counter(writtenKeysCounter).Add(1) + txn.event.Policy = append(txn.event.Policy, storage.PolicyEvent{ + ID: id, + Data: bs, + }) + return nil } func (txn *transaction) DeletePolicy(_ context.Context, id string) error { - return wrapError(txn.underlying.Delete(txn.pm.PolicyID2Key(id))) + if err := txn.underlying.Delete(txn.pm.PolicyID2Key(id)); err != nil { + return wrapError(err) + } + txn.metrics.Counter(deletedKeysCounter).Add(1) + txn.event.Policy = append(txn.event.Policy, storage.PolicyEvent{ + ID: id, + Removed: true, + }) + return nil } func serialize(value interface{}) ([]byte, error) { @@ -379,79 +446,99 @@ func deserialize(bs []byte, result interface{}) error { return wrapError(d.Decode(&result)) } -func patch(data interface{}, op storage.PatchOp, path storage.Path, value interface{}) (interface{}, error) { - - if len(path) == 0 { +func patch(data interface{}, op storage.PatchOp, path storage.Path, idx int, value interface{}) (interface{}, error) { + if idx == len(path) { panic("unreachable") } // Base case: mutate the data value in-place. - if len(path) == 1 { + if len(path) == idx+1 { // last element switch x := data.(type) { case map[string]interface{}: + key := path[len(path)-1] switch op { case storage.RemoveOp: - key := path[len(path)-1] if _, ok := x[key]; !ok { return nil, errors.NewNotFoundError(path) } delete(x, key) return x, nil case storage.ReplaceOp: - key := path[len(path)-1] if _, ok := x[key]; !ok { return nil, errors.NewNotFoundError(path) } x[key] = value return x, nil case storage.AddOp: - key := path[len(path)-1] x[key] = value return x, nil } case []interface{}: - if path[0] == "-" { - return append(x, value), nil - } - idx, err := ptr.ValidateArrayIndex(x, path[0], path) - if err != nil { - return nil, err + switch op { + case storage.AddOp: + if path[idx] == "-" || path[idx] == strconv.Itoa(len(x)) { + return append(x, value), nil + } + i, err := ptr.ValidateArrayIndexForWrite(x, path[idx], idx, path) + if err != nil { + return nil, err + } + // insert at i + return append(x[:i], append([]interface{}{value}, x[i:]...)...), nil + case storage.ReplaceOp: + i, err := ptr.ValidateArrayIndexForWrite(x, path[idx], idx, path) + if err != nil { + return nil, err + } + x[i] = value + return x, nil + case storage.RemoveOp: + i, err := ptr.ValidateArrayIndexForWrite(x, path[idx], idx, path) + if err != nil { + return nil, err + } + return append(x[:i], x[i+1:]...), nil // i is skipped + default: + panic("unreachable") } - x[idx] = value - return x, nil + case nil: // data wasn't set before + return map[string]interface{}{path[idx]: value}, nil default: return nil, errors.NewNotFoundError(path) } } // Recurse on the value located at the next part of the path. - key := path[0] + key := path[idx] switch x := data.(type) { case map[string]interface{}: - child, ok := x[key] - if !ok { - return nil, errors.NewNotFoundError(path) - } - modified, err := patch(child, op, path[1:], value) + modified, err := patch(x[key], op, path, idx+1, value) if err != nil { return nil, err } x[key] = modified return x, nil case []interface{}: - idx, err := ptr.ValidateArrayIndex(x, path[0], path) + i, err := ptr.ValidateArrayIndexForWrite(x, path[idx], idx+1, path) if err != nil { return nil, err } - modified, err := patch(x[idx], op, path[1:], value) + modified, err := patch(x[i], op, path, idx+1, value) if err != nil { return nil, err } - x[idx] = modified + x[i] = modified return x, nil + case nil: // data isn't there yet + y := make(map[string]interface{}, 1) + modified, err := patch(nil, op, path, idx+1, value) + if err != nil { + return nil, err + } + y[key] = modified + return y, nil default: return nil, errors.NewNotFoundError(path) } - } diff --git a/storage/disk/txn_test.go b/storage/disk/txn_test.go index fa319f3023..2f2d4906f0 100644 --- a/storage/disk/txn_test.go +++ b/storage/disk/txn_test.go @@ -6,13 +6,14 @@ package disk import ( "context" + "errors" + "fmt" "math/rand" - "strconv" - "strings" "testing" + "github.com/dgraph-io/badger/v3" + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/storage" - "github.com/open-policy-agent/opa/util" "github.com/open-policy-agent/opa/util/test" ) @@ -26,57 +27,54 @@ func randomString(n int) string { return string(s) } -func fixture(nbKeys int) interface{} { - i := 1 - var keyValues = []string{} - for i <= nbKeys { - keyValues = append(keyValues, "\""+strconv.Itoa(i)+randomString(4)+"\": \""+randomString(3)+"\"") - i++ +func fixture(n int) map[string]interface{} { + foo := map[string]string{} + for i := 0; i < n; i++ { + foo[fmt.Sprintf(`"%d%s"`, i, randomString(4))] = randomString(3) } - jsonBytes := []byte(`{"foo":{` + strings.Join(keyValues, ",") + `}}`) - return util.MustUnmarshalJSON(jsonBytes) + return map[string]interface{}{"foo": foo} } -func TestSetTxnIsTooBigToFitIntoOneRequestWhenUseDiskStore(t *testing.T) { - test.WithTempFS(map[string]string{}, func(dir string) { +func TestSetTxnIsTooBigToFitIntoOneRequestWhenUseDiskStoreReturnsError(t *testing.T) { + test.WithTempFS(nil, func(dir string) { ctx := context.Background() - s, err := New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo"), }}) if err != nil { t.Fatal(err) } - nbKeys := 140_000 // !!! 135_000 it's ok, but 140_000 not + nbKeys := 140_000 // 135_000 is ok, but 140_000 not jsonFixture := fixture(nbKeys) - errTxn := storage.Txn(ctx, s, storage.WriteParams, func(txn storage.Transaction) error { - - errTxnWrite := s.Write(ctx, txn, storage.AddOp, storage.MustParsePath("/"), jsonFixture) - if errTxnWrite != nil { - t.Fatal(errTxnWrite) + err = storage.Txn(ctx, s, storage.WriteParams, func(txn storage.Transaction) error { + err := s.Write(ctx, txn, storage.AddOp, storage.MustParsePath("/"), jsonFixture) + if !errors.Is(err, badger.ErrTxnTooBig) { + t.Errorf("expected %v, got %v", badger.ErrTxnTooBig, err) } - return nil + return err }) - if errTxn != nil { - t.Fatal(errTxn) + if !errors.Is(err, badger.ErrTxnTooBig) { + t.Errorf("expected %v, got %v", badger.ErrTxnTooBig, err) } - result, errRead := storage.ReadOne(ctx, s, storage.MustParsePath("/foo")) - if errRead != nil { - t.Fatal(errRead) + _, err = storage.ReadOne(ctx, s, storage.MustParsePath("/foo")) + var notFound *storage.Error + ok := errors.As(err, ¬Found) + if !ok { + t.Errorf("expected %T, got %v", notFound, err) } - actualNbKeys := len(result.(map[string]interface{})) - if nbKeys != actualNbKeys { - t.Fatalf("Expected %d keys, read %d", nbKeys, actualNbKeys) + if exp, act := storage.NotFoundErr, notFound.Code; exp != act { + t.Errorf("expected code %v, got %v", exp, act) } }) } func TestDeleteTxnIsTooBigToFitIntoOneRequestWhenUseDiskStore(t *testing.T) { - test.WithTempFS(map[string]string{}, func(dir string) { + test.WithTempFS(nil, func(dir string) { ctx := context.Background() - s, err := New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo"), }}) if err != nil { @@ -84,36 +82,45 @@ func TestDeleteTxnIsTooBigToFitIntoOneRequestWhenUseDiskStore(t *testing.T) { } nbKeys := 200_000 jsonFixture := fixture(nbKeys) - errTxn := storage.Txn(ctx, s, storage.WriteParams, func(txn storage.Transaction) error { + foo := jsonFixture["foo"].(map[string]string) - errTxnWrite := s.Write(ctx, txn, storage.AddOp, storage.MustParsePath("/"), jsonFixture) - if errTxnWrite != nil { - t.Fatal(errTxnWrite) + // Write data in increments so we don't step over the too-large-txn limit + for k, v := range foo { + err := storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/foo/"+k), v) + if err != nil { + t.Fatal(err) } - return nil - }) - if errTxn != nil { - t.Fatal(errTxn) } - errTxnD := storage.Txn(ctx, s, storage.WriteParams, func(txn storage.Transaction) error { - errTxnWrite := s.Write(ctx, txn, storage.RemoveOp, storage.MustParsePath("/foo"), jsonFixture) - if errTxnWrite != nil { - t.Fatal(errTxnWrite) + // check expected state + res, err := storage.ReadOne(ctx, s, storage.MustParsePath("/foo")) + if err != nil { + t.Fatal(err) + } + if exp, act := nbKeys, len(res.(map[string]interface{})); exp != act { + t.Fatalf("expected %d keys, read %d", exp, act) + } + + err = storage.Txn(ctx, s, storage.WriteParams, func(txn storage.Transaction) error { + err := s.Write(ctx, txn, storage.RemoveOp, storage.MustParsePath("/foo"), jsonFixture) + if !errors.Is(err, badger.ErrTxnTooBig) { + t.Errorf("expected %v, got %v", badger.ErrTxnTooBig, err) } - return nil + return err }) - if errTxnD != nil { - t.Fatal(errTxnD) + if !errors.Is(err, badger.ErrTxnTooBig) { + t.Errorf("expected %v, got %v", badger.ErrTxnTooBig, err) } - results, errRead := storage.ReadOne(ctx, s, storage.MustParsePath("/foo")) - if !storage.IsNotFound(errRead) { - t.Fatal(errRead) + // check expected state again + res, err = storage.ReadOne(ctx, s, storage.MustParsePath("/foo")) + if err != nil { + t.Fatal(err) } - if results != nil { - t.Fatalf("Unexpected results %v", results) + if exp, act := nbKeys, len(res.(map[string]interface{})); exp != act { + t.Fatalf("expected %d keys, read %d", exp, act) } + }) } diff --git a/storage/errors.go b/storage/errors.go index d83b275ac7..2ea68fca34 100644 --- a/storage/errors.go +++ b/storage/errors.go @@ -99,7 +99,7 @@ func IsIndexingNotSupported(error) bool { return false } func writeConflictError(path Path) *Error { return &Error{ Code: WriteConflictErr, - Message: fmt.Sprint(path), + Message: path.String(), } } diff --git a/storage/inmem/inmem_test.go b/storage/inmem/inmem_test.go index 1356bcfa03..8902d751fa 100644 --- a/storage/inmem/inmem_test.go +++ b/storage/inmem/inmem_test.go @@ -610,7 +610,7 @@ func TestInMemoryContext(t *testing.T) { } _, err = store.Register(ctx, txn, storage.TriggerConfig{ - OnCommit: func(ctx context.Context, txn storage.Transaction, event storage.TriggerEvent) { + OnCommit: func(_ context.Context, _ storage.Transaction, event storage.TriggerEvent) { if event.Context.Get("foo") != "bar" { t.Fatalf("Expected foo/bar in context but got: %+v", event.Context) } else if event.Context.Get("deadbeef") != nil { diff --git a/storage/inmem/txn.go b/storage/inmem/txn.go index 5254523ec4..440ec9f6ac 100644 --- a/storage/inmem/txn.go +++ b/storage/inmem/txn.go @@ -311,20 +311,21 @@ func newUpdateArray(data []interface{}, op storage.PatchOp, path storage.Path, i return nil, err } - if op == storage.AddOp { + switch op { + case storage.AddOp: cpy := make([]interface{}, len(data)+1) copy(cpy[:pos], data[:pos]) copy(cpy[pos+1:], data[pos:]) cpy[pos] = value return &update{path[:len(path)-1], false, cpy}, nil - } else if op == storage.RemoveOp { + case storage.RemoveOp: cpy := make([]interface{}, len(data)-1) copy(cpy[:pos], data[:pos]) copy(cpy[pos:], data[pos+1:]) return &update{path[:len(path)-1], false, cpy}, nil - } else { + default: cpy := make([]interface{}, len(data)) copy(cpy, data) cpy[pos] = value diff --git a/storage/interface.go b/storage/interface.go index 7502e7babc..38252f32da 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -6,6 +6,8 @@ package storage import ( "context" + + "github.com/open-policy-agent/opa/metrics" ) // Transaction defines the interface that identifies a consistent snapshot over @@ -77,6 +79,27 @@ func (ctx *Context) Put(key, value interface{}) { ctx.values[key] = value } +var metricsKey = struct{}{} + +// WithMetrics allows passing metrics via the Context. +// It puts the metrics object in the ctx, and returns the same +// ctx (not a copy) for convenience. +func (ctx *Context) WithMetrics(m metrics.Metrics) *Context { + ctx.values[metricsKey] = m + return ctx +} + +// Metrics() allows using a Context's metrics. Returns nil if metrics +// were not attached to the Context. +func (ctx *Context) Metrics() metrics.Metrics { + if m, ok := ctx.values[metricsKey]; ok { + if met, ok := m.(metrics.Metrics); ok { + return met + } + } + return nil +} + // WriteParams specifies the TransactionParams for a write transaction. var WriteParams = TransactionParams{ Write: true, diff --git a/storage/internal/errors/errors.go b/storage/internal/errors/errors.go index 74792f63e4..0bba74b907 100644 --- a/storage/internal/errors/errors.go +++ b/storage/internal/errors/errors.go @@ -30,3 +30,10 @@ func NewNotFoundErrorf(f string, a ...interface{}) *storage.Error { Message: msg, } } + +func NewWriteConflictError(p storage.Path) *storage.Error { + return &storage.Error{ + Code: storage.WriteConflictErr, + Message: p.String(), + } +} diff --git a/storage/internal/ptr/ptr.go b/storage/internal/ptr/ptr.go index 6778805faa..56772f7976 100644 --- a/storage/internal/ptr/ptr.go +++ b/storage/internal/ptr/ptr.go @@ -37,12 +37,32 @@ func Ptr(data interface{}, path storage.Path) (interface{}, error) { } func ValidateArrayIndex(arr []interface{}, s string, path storage.Path) (int, error) { - idx, err := strconv.Atoi(s) - if err != nil { + idx, ok := isInt(s) + if !ok { return 0, errors.NewNotFoundErrorWithHint(path, errors.ArrayIndexTypeMsg) } - if idx < 0 || idx >= len(arr) { + return inRange(idx, arr, path) +} + +// ValidateArrayIndexForWrite also checks that `s` is a valid way to address an +// array element like `ValidateArrayIndex`, but returns a `resource_conflict` error +// if it is not. +func ValidateArrayIndexForWrite(arr []interface{}, s string, i int, path storage.Path) (int, error) { + idx, ok := isInt(s) + if !ok { + return 0, errors.NewWriteConflictError(path[:i-1]) + } + return inRange(idx, arr, path) +} + +func isInt(s string) (int, bool) { + idx, err := strconv.Atoi(s) + return idx, err == nil +} + +func inRange(i int, arr []interface{}, path storage.Path) (int, error) { + if i < 0 || i >= len(arr) { return 0, errors.NewNotFoundErrorWithHint(path, errors.OutOfRangeMsg) } - return idx, nil + return i, nil } diff --git a/test/authz/authz_bench_test.go b/test/authz/authz_bench_test.go index 9311ca8bc1..dc6cf9c75e 100644 --- a/test/authz/authz_bench_test.go +++ b/test/authz/authz_bench_test.go @@ -6,16 +6,25 @@ package authz import ( "context" + "io/ioutil" + "os" "testing" "github.com/open-policy-agent/opa/ast" + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/rego" "github.com/open-policy-agent/opa/storage" + "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/storage/inmem" ) func BenchmarkAuthzForbidAuthn(b *testing.B) { - runAuthzBenchmark(b, ForbidIdentity, 10) + b.Run("inmem", func(b *testing.B) { + runAuthzBenchmark(b, ForbidIdentity, 10) + }) + b.Run("disk", func(b *testing.B) { + runAuthzBenchmark(b, ForbidIdentity, 10, true) + }) } func BenchmarkAuthzForbidPath(b *testing.B) { @@ -38,7 +47,7 @@ func BenchmarkAuthzAllow1000Paths(b *testing.B) { runAuthzBenchmark(b, Allow, 1000) } -func runAuthzBenchmark(b *testing.B, mode InputMode, numPaths int) { +func runAuthzBenchmark(b *testing.B, mode InputMode, numPaths int, extras ...bool) { profile := DataSetProfile{ NumTokens: 1000, @@ -47,7 +56,36 @@ func runAuthzBenchmark(b *testing.B, mode InputMode, numPaths int) { ctx := context.Background() data := GenerateDataset(profile) - store := inmem.NewFromObject(data) + + useDisk := false + if len(extras) > 0 { + useDisk = extras[0] + } + + var store storage.Store + if useDisk { + rootDir, err := ioutil.TempDir("", "test-e2e-bench-disk") + if err != nil { + panic(err) + } + + defer os.RemoveAll(rootDir) + store, err = disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ + Dir: rootDir, + Partitions: nil, + }) + if err != nil { + b.Fatal(err) + } + + err = storage.WriteOne(ctx, store, storage.AddOp, storage.Path{}, data) + if err != nil { + b.Fatal(err) + } + } else { + store = inmem.NewFromObject(data) + } + txn := storage.NewTransactionOrDie(ctx, store) compiler := ast.NewCompiler() module := ast.MustParseModule(Policy) diff --git a/test/e2e/authz/authz_bench_integration_test.go b/test/e2e/authz/authz_bench_integration_test.go index 92605b2e54..5eb82a3379 100644 --- a/test/e2e/authz/authz_bench_integration_test.go +++ b/test/e2e/authz/authz_bench_integration_test.go @@ -24,7 +24,8 @@ func TestMain(m *testing.M) { flag.Parse() testServerParams := e2e.NewAPIServerTestParams() - + disk, cleanup := diskStorage() + testServerParams.DiskStorage = disk var err error testRuntime, err = e2e.NewTestRuntime(testServerParams) if err != nil { @@ -32,6 +33,11 @@ func TestMain(m *testing.M) { } errc := testRuntime.RunTests(m) + if cleanup != nil { + if err := cleanup(); err != nil { + panic(err) + } + } os.Exit(errc) } diff --git a/test/e2e/authz/disk.go b/test/e2e/authz/disk.go new file mode 100644 index 0000000000..4bb8592811 --- /dev/null +++ b/test/e2e/authz/disk.go @@ -0,0 +1,25 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +//go:build bench_disk +// +build bench_disk + +// nolint: deadcode,unused // build tags confuse these linters +package authz + +import ( + "io/ioutil" + "os" + + "github.com/open-policy-agent/opa/storage/disk" +) + +func diskStorage() (*disk.Options, func() error) { + dir, err := ioutil.TempDir("", "disk-store") + if err != nil { + panic(err) + } + + return &disk.Options{Dir: dir, Partitions: nil}, func() error { return os.RemoveAll(dir) } +} diff --git a/test/e2e/authz/nodisk.go b/test/e2e/authz/nodisk.go new file mode 100644 index 0000000000..8849441bd1 --- /dev/null +++ b/test/e2e/authz/nodisk.go @@ -0,0 +1,15 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +//go:build !bench_disk +// +build !bench_disk + +// nolint: deadcode,unused // build tags confuse these linters +package authz + +import "github.com/open-policy-agent/opa/storage/disk" + +func diskStorage() (*disk.Options, func() error) { + return nil, nil +} diff --git a/test/e2e/concurrency/concurrency_test.go b/test/e2e/concurrency/concurrency_test.go index 95825725d3..e084af0d86 100644 --- a/test/e2e/concurrency/concurrency_test.go +++ b/test/e2e/concurrency/concurrency_test.go @@ -2,6 +2,7 @@ package concurrency import ( "flag" + "io/ioutil" "os" "runtime" "strings" @@ -9,6 +10,7 @@ import ( "testing" "github.com/open-policy-agent/opa/server/types" + "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/test/e2e" ) @@ -18,13 +20,26 @@ func TestMain(m *testing.M) { flag.Parse() testServerParams := e2e.NewAPIServerTestParams() - var err error - testRuntime, err = e2e.NewTestRuntimeWithOpts(e2e.TestRuntimeOpts{}, testServerParams) + dir, err := ioutil.TempDir("", "disk-store") if err != nil { - os.Exit(1) + panic(err) + } + defer func() { os.RemoveAll(dir) }() + + for _, opts := range []*disk.Options{ + nil, + {Dir: dir, Partitions: nil}, + } { + var err error + testServerParams.DiskStorage = opts + testRuntime, err = e2e.NewTestRuntime(testServerParams) + if err != nil { + panic(err) + } + if ec := testRuntime.RunTests(m); ec != 0 { + os.Exit(ec) + } } - - os.Exit(testRuntime.RunTests(m)) } func TestConcurrencyGetV1Data(t *testing.T) { diff --git a/test/e2e/shutdown/shutdown_test.go b/test/e2e/shutdown/shutdown_test.go index a3e5580734..8579d8e2b4 100644 --- a/test/e2e/shutdown/shutdown_test.go +++ b/test/e2e/shutdown/shutdown_test.go @@ -43,7 +43,7 @@ func TestShutdownWaitPeriod(t *testing.T) { time.Sleep(1500 * time.Millisecond) - // Ensure that OPA i still running + // Ensure that OPA is still running err = testRuntime.HealthCheck(testRuntime.URL()) if err != nil { t.Fatalf("Expected health endpoint to be up but got:\n\n%v", err) diff --git a/test/e2e/wasm/authz/authz_bench_integration_test.go b/test/e2e/wasm/authz/authz_bench_integration_test.go index 9c1923d497..bf67e58ffd 100644 --- a/test/e2e/wasm/authz/authz_bench_integration_test.go +++ b/test/e2e/wasm/authz/authz_bench_integration_test.go @@ -2,6 +2,7 @@ // Use of this source code is governed by an Apache2 // license that can be found in the LICENSE file. +//go:build opa_wasm // +build opa_wasm package authz @@ -36,7 +37,8 @@ func TestMain(m *testing.M) { flag.Parse() testServerParams := e2e.NewAPIServerTestParams() - + var cleanup func() error + testServerParams.DiskStorage, cleanup = diskStorage() var err error testRuntime, err = e2e.NewTestRuntime(testServerParams) if err != nil { @@ -93,6 +95,11 @@ func TestMain(m *testing.M) { } errc := testRuntime.RunTests(m) + if errc == 0 && cleanup != nil { + if err := cleanup(); err != nil { + panic(err) + } + } os.Exit(errc) } diff --git a/test/e2e/wasm/authz/disk.go b/test/e2e/wasm/authz/disk.go new file mode 100644 index 0000000000..4bb8592811 --- /dev/null +++ b/test/e2e/wasm/authz/disk.go @@ -0,0 +1,25 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +//go:build bench_disk +// +build bench_disk + +// nolint: deadcode,unused // build tags confuse these linters +package authz + +import ( + "io/ioutil" + "os" + + "github.com/open-policy-agent/opa/storage/disk" +) + +func diskStorage() (*disk.Options, func() error) { + dir, err := ioutil.TempDir("", "disk-store") + if err != nil { + panic(err) + } + + return &disk.Options{Dir: dir, Partitions: nil}, func() error { return os.RemoveAll(dir) } +} diff --git a/test/e2e/wasm/authz/nodisk.go b/test/e2e/wasm/authz/nodisk.go new file mode 100644 index 0000000000..8849441bd1 --- /dev/null +++ b/test/e2e/wasm/authz/nodisk.go @@ -0,0 +1,15 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +//go:build !bench_disk +// +build !bench_disk + +// nolint: deadcode,unused // build tags confuse these linters +package authz + +import "github.com/open-policy-agent/opa/storage/disk" + +func diskStorage() (*disk.Options, func() error) { + return nil, nil +}