diff --git a/config/config.go b/config/config.go index b38a432207..837b075a38 100644 --- a/config/config.go +++ b/config/config.go @@ -34,6 +34,9 @@ type Config struct { Caching json.RawMessage `json:"caching,omitempty"` PersistenceDirectory *string `json:"persistence_directory,omitempty"` DistributedTracing json.RawMessage `json:"distributed_tracing,omitempty"` + Storage *struct { + Disk json.RawMessage `json:"disk,omitempty"` + } `json:"storage,omitempty"` } // ParseConfig returns a valid Config object with defaults injected. The id diff --git a/docs/content/configuration.md b/docs/content/configuration.md index 4366e48f03..1a20fad004 100644 --- a/docs/content/configuration.md +++ b/docs/content/configuration.md @@ -837,3 +837,20 @@ The following encryption methods are supported: | `off` | Disable TLS | | `tls` | Enable TLS | | `mtls` | Enable mutual TLS | + +### Disk Storage + +The `storage` configuration key allows for enabling, and configuring, the +persistent on-disk storage of an OPA instance. + +If `disk` is set to something, the server will enable the on-disk store +with data put into the configured `directory`. + +| Field | Type | Required | Description | +| --- | --- | --- | --- | +| `storage.disk.directory` | `string` | Yes | This is the directory to use for storing the persistent database. | +| `storage.disk.auto_create` | `bool` | No (default: `false`) | If set to true, the configured directory will be created if it does not exist. | +| `storage.disk.partitions` | `array[string]` | No | Non-overlapping `data` prefixes used for partitioning the data on disk. | +| `storage.disk.badger` | `string` | No (default: empty) | "Superflags" passed to Badger allowing to modify advanced options. | + +See [the docs on disk storage](../misc-disk/) for details about the settings. diff --git a/docs/content/misc-disk.md b/docs/content/misc-disk.md new file mode 100644 index 0000000000..fb0208ef08 --- /dev/null +++ b/docs/content/misc-disk.md @@ -0,0 +1,177 @@ +--- +title: Disk Storage +kind: misc +weight: 10 +--- + +This page outlines configuration options relevant to using the disk storage +feature of OPA. +Configuration options are to be found in [the configuration docs](../configuration/#disk-storage). + +{{< info >}} +The persistent disk storage enables OPA to work with data that does not fit +into the memory resources granted to the OPA server. +It is **not** supposed to be used as the primary source of truth for that data. + +The on-disk storage should be considered ephemeral: you need to secure the +means to restore that data. +Backup and restore, or repair procedures for data corruption are not provided +at this time. +{{< /info >}} + +## Partitions + +Partitions determine how the JSON data is split up when stored in the +underlying key-value store. +For example, this table shows how an example document would be stored given +different configured partitions: + +```json +{ + "users": { + "alice": { "roles": ["admin"] }, + "bob": { "roles": ["viewer"] } + } +} +``` + +| Partitions | Keys | Values | +| --- | --- | --- | +| (1) none | `/users` | `{"alice": {"roles": ["admin"]}, "bob": {"roles": ["viewer"]}}}` | +| --- | --- | --- | +| (2) `/users` | `/users/alice` | `{"roles": ["admin"]}` | +| | `/users/bob` | `{"roles": ["viewer"]}` | +| --- | --- | --- | +| (3) `/users/*` | `/users/alice/roles` | `["admin"]` | +| | `/users/bob/roles` | `["viewer"]` | + +Partitioning has consequences on performance: in the example above, the +number of keys to retrieve from the database (and the amount of data of +its values) varies. + +| Query | Partitions | Number of keys read | +| --- | --- | --- | +| `data.users` | (1) | 1 | +| | (2) | 2 | +| | (3) | 2 | +| --- | --- | --- | +| `data.users.alice` | (1) | 1 with `bob` data thrown away| +| | (2) | 2 | +| | (3) | 2 | + + +For example, retrieving the full extent of `data.users` from the disk store +will require a single key fetch with the partitions of (1). +With (2), the storage engine will fetch two keys and their values. + +Retrieving a single user's data, e.g. `data.users.alice`, will require +reading a single key and all the users data with (1); but throw away most +of it: all the data not belonging to `alice`. + +There is no one-size-fits-all setting for partitions: good settings depend +on the actual usage, and that comes down to the policies that are used with +OPA. +Commonly, you would optimize the partition settings for those queries that +are performance critical. + +To figure out suboptimal partitioning, please have a look at the exposed +metrics. + +## Metrics + +Using the [REST API](../rest-api/), you can include the `?metrics` query string +to gain insights into the disk storage access related to a certain OPA query. + +``` +$ curl 'http://localhost:8181/v1/data/tenants/acme1/bindings/user1?metrics' | opa eval -I 'input.metrics' -fpretty +{ + "counter_disk_read_bytes": 339, + "counter_disk_read_keys": 3, + "counter_server_query_cache_hit": 1, + "timer_disk_read_ns": 40736, + "timer_rego_external_resolve_ns": 251, + "timer_rego_input_parse_ns": 656, + "timer_rego_query_eval_ns": 66616, + "timer_server_handler_ns": 117539 +} +``` + +The `timer_disk_*_ns` timers give an indication about how much time +was spent with the different disk operations. + +Available timers are +- `timer_disk_read_ns` +- `timer_disk_write_ns` +- `timer_disk_commit_ns` + +Also note the `counter_disk_*` counters in the metrics: + +- `counter_disk_read_keys`: number of keys retrieved +- `counter_disk_written_keys`: number of keys written +- `counter_disk_deleted_keys`: number of keys deleted +- `counter_disk_read_bytes`: bytes retrieved + +Suboptimal partition settings can be spotted when the amount of +keys and bytes retrieved for a query is unproportional to the +actual data returned: the query likely had to retrieve a giant +JSON object, and most of it was thrown away. + +## Debug Logging + +Pass `--log-level debug` to `opa run` to see all the underlying storage +engine's logs. + +When debug logging is _enabled_, the service will output some +statistics about the configured disk partitions and their key +sizes. + +``` +[DEBUG] partition /tenants/acme3/bindings (pattern /tenants/*/bindings): key count: 10000 (estimated size 598890 bytes) +[DEBUG] partition /tenants/acme4/bindings (pattern /tenants/*/bindings): key count: 10000 (estimated size 598890 bytes) +[DEBUG] partition /tenants/acme8/bindings (pattern /tenants/*/bindings): key count: 10000 (estimated size 598890 bytes) +[DEBUG] partition /tenants/acme9/bindings (pattern /tenants/*/bindings): key count: 10000 (estimated size 598890 bytes) +[DEBUG] partition /tenants/acme0/bindings (pattern /tenants/*/bindings): key count: 10000 (estimated size 598890 bytes) +[DEBUG] partition /tenants/acme2/bindings (pattern /tenants/*/bindings): key count: 10000 (estimated size 598890 bytes) +[DEBUG] partition /tenants/acme6/bindings (pattern /tenants/*/bindings): key count: 10000 (estimated size 598890 bytes) +``` + +Note that this process will iterate over all database keys. +It only happens on startup, when debug logging is enabled. + +## Fine-tuning Badger settings (superflags) + +While partitioning should be the first thing to look into to tune the memory usage and +performance of the on-disk storage engine, this configurable gives you the means to +change many internal aspects of how Badger uses memory and disk storage. + +{{< danger >}} +To be used with care! + +Any of the Badger settings used by OPA can be overridden using this feature. +There is no validation happening for configurables set using this flag. + +When the embedded Badger version changes, these configurables could change, +too. +{{< /danger >}} + +The configurables correspond to Badger options that can be set on [the library's Options +struct](https://pkg.go.dev/github.com/dgraph-io/badger/v3#Options). + +The following configurables can *not* be overridden: +- `dir` +- `valuedir` +- `detectconflicts` + +Aside from conflict detection, Badger in OPA uses the default options [you can find here](https://github.com/dgraph-io/badger/blob/v3.2103.2/options.go#L128-L187). + +Conflict detection is disabled because the locking scheme used within OPA does not allow +for having multiple concurrent writes. + +### Example + +```yaml +storage: + disk: + directory: /tmp/disk + badger: nummemtables=1; numgoroutines=2; maxlevels=3 +``` \ No newline at end of file diff --git a/docs/content/rest-api.md b/docs/content/rest-api.md index 34d03544a0..09d81a57c9 100644 --- a/docs/content/rest-api.md +++ b/docs/content/rest-api.md @@ -1002,6 +1002,10 @@ If the path does not refer to an existing document, the server will attempt to c The server will respect the `If-None-Match` header if it is set to `*`. In this case, the server will not overwrite an existing document located at the path. +#### Query Parameters + +- **metrics** - Return performance metrics in addition to result. See [Performance Metrics](#performance-metrics) for more detail. + #### Status Codes - **204** - no content (success) @@ -1094,6 +1098,10 @@ Delete a document. The server processes the DELETE method as if the client had sent a PATCH request containing a single remove operation. +#### Query Parameters + +- **metrics** - Return performance metrics in addition to result. See [Performance Metrics](#performance-metrics) for more detail. + #### Status Codes - **204** - no content (success) diff --git a/internal/config/config.go b/internal/config/config.go index 69d930f4a4..b2020181a0 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -83,7 +83,7 @@ func Load(configFile string, overrides []string, overrideFiles []string) ([]byte processedConf := subEnvVars(string(bytes)) if err := yaml.Unmarshal([]byte(processedConf), &baseConf); err != nil { - return []byte{}, fmt.Errorf("failed to parse %s: %s", configFile, err) + return nil, fmt.Errorf("failed to parse %s: %s", configFile, err) } } @@ -93,7 +93,7 @@ func Load(configFile string, overrides []string, overrideFiles []string) ([]byte for _, override := range overrides { processedOverride := subEnvVars(override) if err := strvals.ParseInto(processedOverride, overrideConf); err != nil { - return []byte{}, fmt.Errorf("failed parsing --set data: %s", err) + return nil, fmt.Errorf("failed parsing --set data: %s", err) } } @@ -105,7 +105,7 @@ func Load(configFile string, overrides []string, overrideFiles []string) ([]byte return value, err } if err := strvals.ParseIntoFile(override, overrideConf, reader); err != nil { - return []byte{}, fmt.Errorf("failed parsing --set-file data: %s", err) + return nil, fmt.Errorf("failed parsing --set-file data: %s", err) } } diff --git a/plugins/bundle/plugin.go b/plugins/bundle/plugin.go index 2080187120..fba263c5fb 100644 --- a/plugins/bundle/plugin.go +++ b/plugins/bundle/plugin.go @@ -188,7 +188,7 @@ func (p *Plugin) Reconfigure(ctx context.Context, config interface{}) { // Deactivate the bundles that were removed params := storage.WriteParams - params.Context = storage.NewContext() + params.Context = storage.NewContext() // TODO(sr): metrics? err := storage.Txn(ctx, p.manager.Store, params, func(txn storage.Transaction) error { opts := &bundle.DeactivateOpts{ Ctx: ctx, @@ -513,7 +513,7 @@ func (p *Plugin) activate(ctx context.Context, name string, b *bundle.Bundle) er p.log(name).Debug("Bundle activation in progress. Opening storage transaction.") params := storage.WriteParams - params.Context = storage.NewContext() + params.Context = storage.NewContext().WithMetrics(p.status[name].Metrics) err := storage.Txn(ctx, p.manager.Store, params, func(txn storage.Transaction) error { p.log(name).Debug("Opened storage transaction (%v).", txn.ID()) diff --git a/plugins/bundle/plugin_test.go b/plugins/bundle/plugin_test.go index 7820e1d04b..4cce413326 100644 --- a/plugins/bundle/plugin_test.go +++ b/plugins/bundle/plugin_test.go @@ -23,20 +23,20 @@ import ( "testing" "time" - "github.com/open-policy-agent/opa/internal/file/archive" - - "github.com/open-policy-agent/opa/util/test" - "github.com/open-policy-agent/opa/ast" "github.com/open-policy-agent/opa/bundle" "github.com/open-policy-agent/opa/config" "github.com/open-policy-agent/opa/download" + "github.com/open-policy-agent/opa/internal/file/archive" "github.com/open-policy-agent/opa/keys" + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/metrics" "github.com/open-policy-agent/opa/plugins" "github.com/open-policy-agent/opa/storage" + "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/storage/inmem" "github.com/open-policy-agent/opa/util" + "github.com/open-policy-agent/opa/util/test" ) func TestPluginOneShot(t *testing.T) { @@ -97,6 +97,111 @@ func TestPluginOneShot(t *testing.T) { } } +func TestPluginOneShotDiskStorageMetrics(t *testing.T) { + + test.WithTempFS(nil, func(dir string) { + ctx := context.Background() + met := metrics.New() + store, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ + Dir: dir, + Partitions: []storage.Path{ + storage.MustParsePath("/foo"), + }, + }) + if err != nil { + t.Fatal(err) + } + manager := getTestManagerWithOpts(nil, store) + defer manager.Stop(ctx) + plugin := New(&Config{}, manager) + bundleName := "test-bundle" + plugin.status[bundleName] = &Status{Name: bundleName, Metrics: met} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) + + ensurePluginState(t, plugin, plugins.StateNotReady) + + module := "package foo\n\ncorge=1" + + b := bundle.Bundle{ + Manifest: bundle.Manifest{Revision: "quickbrownfaux"}, + Data: util.MustUnmarshalJSON([]byte(`{"foo": {"bar": 1, "baz": "qux"}}`)).(map[string]interface{}), + Modules: []bundle.ModuleFile{ + { + Path: "/foo/bar", + Parsed: ast.MustParseModule(module), + Raw: []byte(module), + }, + }, + } + + b.Manifest.Init() + + met = metrics.New() + plugin.oneShot(ctx, bundleName, download.Update{Bundle: &b, Metrics: met}) + + ensurePluginState(t, plugin, plugins.StateOK) + + // NOTE(sr): These assertion reflect the current behaviour only! Not prescriptive. + name := "disk_deleted_keys" + if exp, act := 1, met.Counter(name).Value(); act.(uint64) != uint64(exp) { + t.Errorf("%s: expected %v, got %v", name, exp, act) + } + name = "disk_written_keys" + if exp, act := 7, met.Counter(name).Value(); act.(uint64) != uint64(exp) { + t.Errorf("%s: expected %v, got %v", name, exp, act) + } + name = "disk_read_keys" + if exp, act := 13, met.Counter(name).Value(); act.(uint64) != uint64(exp) { + t.Errorf("%s: expected %v, got %v", name, exp, act) + } + name = "disk_read_bytes" + if exp, act := 346, met.Counter(name).Value(); act.(uint64) != uint64(exp) { + t.Errorf("%s: expected %v, got %v", name, exp, act) + } + for _, timer := range []string{ + "disk_commit", + "disk_write", + "disk_read", + } { + if act := met.Timer(timer).Int64(); act <= 0 { + t.Errorf("%s: expected non-zero timer, got %v", timer, act) + } + } + if t.Failed() { + t.Logf("all metrics: %v", met.All()) + } + + // Ensure we can read it all back -- this is the only bundle plugin test using disk storage, + // so some duplicating with TestPluginOneShot is OK: + + txn := storage.NewTransactionOrDie(ctx, manager.Store) + defer manager.Store.Abort(ctx, txn) + + ids, err := manager.Store.ListPolicies(ctx, txn) + if err != nil { + t.Fatal(err) + } else if len(ids) != 1 { + t.Fatal("Expected 1 policy") + } + + bs, err := manager.Store.GetPolicy(ctx, txn, ids[0]) + exp := []byte("package foo\n\ncorge=1") + if err != nil { + t.Fatal(err) + } else if !bytes.Equal(bs, exp) { + t.Fatalf("Bad policy content. Exp:\n%v\n\nGot:\n\n%v", string(exp), string(bs)) + } + + data, err := manager.Store.Read(ctx, txn, storage.Path{}) + expData := util.MustUnmarshalJSON([]byte(`{"foo": {"bar": 1, "baz": "qux"}, "system": {"bundles": {"test-bundle": {"manifest": {"revision": "quickbrownfaux", "roots": [""]}}}}}`)) + if err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(data, expData) { + t.Fatalf("Bad data content. Exp:\n%v\n\nGot:\n\n%v", expData, data) + } + }) +} + func TestPluginOneShotDeltaBundle(t *testing.T) { ctx := context.Background() @@ -1654,8 +1759,12 @@ func getTestManager() *plugins.Manager { return getTestManagerWithOpts(nil) } -func getTestManagerWithOpts(config []byte) *plugins.Manager { +func getTestManagerWithOpts(config []byte, stores ...storage.Store) *plugins.Manager { store := inmem.New() + if len(stores) == 1 { + store = stores[0] + } + manager, err := plugins.New(config, "test-instance-id", store) if err != nil { panic(err) diff --git a/plugins/plugins.go b/plugins/plugins.go index a3120b6155..85c260b5fb 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -625,6 +625,11 @@ func (m *Manager) Stop(ctx context.Context) { for i := range toStop { toStop[i].Stop(ctx) } + if c, ok := m.Store.(interface{ Close(context.Context) error }); ok { + if err := c.Close(ctx); err != nil { + m.logger.Error("Error closing store: %v", err) + } + } } // Reconfigure updates the configuration on the manager. diff --git a/rego/example_test.go b/rego/example_test.go index 039c52986c..40d2d0ea2b 100644 --- a/rego/example_test.go +++ b/rego/example_test.go @@ -14,6 +14,7 @@ import ( "os" "strings" + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/ast" @@ -352,7 +353,7 @@ func ExampleRego_Eval_persistent_storage() { // user's data is stored on a different row. Assuming the policy only reads // data for a single user to process the policy query, OPA can avoid loading // _all_ user data into memory this way. - store, err := disk.New(ctx, disk.Options{ + store, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ Dir: rootDir, Partitions: []storage.Path{{"example", "user"}}, }) @@ -379,7 +380,7 @@ func ExampleRego_Eval_persistent_storage() { // Re-open the store in the same directory. store.Close(ctx) - store2, err := disk.New(ctx, disk.Options{ + store2, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ Dir: rootDir, Partitions: []storage.Path{{"example", "user"}}, }) diff --git a/runtime/runtime.go b/runtime/runtime.go index 2fe38e3345..9ac3c3baa3 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -45,6 +45,7 @@ import ( "github.com/open-policy-agent/opa/repl" "github.com/open-policy-agent/opa/server" "github.com/open-policy-agent/opa/storage" + "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/storage/inmem" "github.com/open-policy-agent/opa/tracing" "github.com/open-policy-agent/opa/util" @@ -203,6 +204,11 @@ type Params struct { // If it is nil, a new mux.Router will be created Router *mux.Router + // DiskStorage, if set, will make the runtime instantiate a disk-backed storage + // implementation (instead of the default, in-memory store). + // It can also be enabled via config, and this runtime field takes precedence. + DiskStorage *disk.Options + DistributedTracingOpts tracing.Options } @@ -300,14 +306,11 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) { return nil, err } - var consoleLogger logging.Logger - - if params.ConsoleLogger == nil { + consoleLogger := params.ConsoleLogger + if consoleLogger == nil { l := logging.New() l.SetFormatter(internal_logging.GetFormatter(params.Logging.Format)) consoleLogger = l - } else { - consoleLogger = params.ConsoleLogger } if params.Router == nil { @@ -316,9 +319,26 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) { metrics := prometheus.New(metrics.New(), errorLogger(logger)) + var store storage.Store + if params.DiskStorage == nil { + params.DiskStorage, err = disk.OptionsFromConfig(config, params.ID) + if err != nil { + return nil, fmt.Errorf("parse disk store configuration: %w", err) + } + } + + if params.DiskStorage != nil { + store, err = disk.New(ctx, logger, metrics, *params.DiskStorage) + if err != nil { + return nil, fmt.Errorf("initialize disk store: %w", err) + } + } else { + store = inmem.New() + } + manager, err := plugins.New(config, params.ID, - inmem.New(), + store, plugins.Info(info), plugins.InitBundles(loaded.Bundles), plugins.InitFiles(loaded.Files), diff --git a/server/server.go b/server/server.go index 44181667fb..e52bc40d9a 100644 --- a/server/server.go +++ b/server/server.go @@ -1222,7 +1222,8 @@ func (s *Server) v1CompilePost(w http.ResponseWriter, r *http.Request) { m.Timer(metrics.RegoQueryParse).Stop() - txn, err := s.store.NewTransaction(ctx) + c := storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, storage.TransactionParams{Context: c}) if err != nil { writer.ErrorAuto(w, err) return @@ -1331,7 +1332,8 @@ func (s *Server) v1DataGet(w http.ResponseWriter, r *http.Request) { m.Timer(metrics.RegoInputParse).Stop() // Prepare for query. - txn, err := s.store.NewTransaction(ctx) + c := storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, storage.TransactionParams{Context: c}) if err != nil { writer.ErrorAuto(w, err) return @@ -1455,15 +1457,21 @@ func (s *Server) v1DataGet(w http.ResponseWriter, r *http.Request) { } func (s *Server) v1DataPatch(w http.ResponseWriter, r *http.Request) { + m := metrics.New() + m.Timer(metrics.ServerHandler).Start() + defer m.Timer(metrics.ServerHandler).Stop() + ctx := r.Context() vars := mux.Vars(r) - + includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true) ops := []types.PatchV1{} + m.Timer(metrics.RegoInputParse).Start() if err := util.NewJSONDecoder(r.Body).Decode(&ops); err != nil { writer.ErrorString(w, http.StatusBadRequest, types.CodeInvalidParameter, err) return } + m.Timer(metrics.RegoInputParse).Stop() patches, err := s.prepareV1PatchSlice(vars["path"], ops) if err != nil { @@ -1471,7 +1479,9 @@ func (s *Server) v1DataPatch(w http.ResponseWriter, r *http.Request) { return } - txn, err := s.store.NewTransaction(ctx, storage.WriteParams) + params := storage.WriteParams + params.Context = storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return @@ -1500,6 +1510,14 @@ func (s *Server) v1DataPatch(w http.ResponseWriter, r *http.Request) { return } + if includeMetrics { + result := types.DataResponseV1{ + Metrics: m.All(), + } + writer.JSON(w, http.StatusOK, result, false) + return + } + writer.Bytes(w, http.StatusNoContent, nil) } @@ -1541,7 +1559,9 @@ func (s *Server) v1DataPost(w http.ResponseWriter, r *http.Request) { m.Timer(metrics.RegoInputParse).Stop() - txn, err := s.store.NewTransaction(ctx) + params := storage.WriteParams + params.Context = storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return @@ -1674,14 +1694,21 @@ func (s *Server) v1DataPost(w http.ResponseWriter, r *http.Request) { } func (s *Server) v1DataPut(w http.ResponseWriter, r *http.Request) { + m := metrics.New() + m.Timer(metrics.ServerHandler).Start() + defer m.Timer(metrics.ServerHandler).Stop() + ctx := r.Context() vars := mux.Vars(r) + includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true) + m.Timer(metrics.RegoInputParse).Start() var value interface{} if err := util.NewJSONDecoder(r.Body).Decode(&value); err != nil { writer.ErrorString(w, http.StatusBadRequest, types.CodeInvalidParameter, err) return } + m.Timer(metrics.RegoInputParse).Stop() path, ok := storage.ParsePathEscaped("/" + strings.Trim(vars["path"], "/")) if !ok { @@ -1689,7 +1716,9 @@ func (s *Server) v1DataPut(w http.ResponseWriter, r *http.Request) { return } - txn, err := s.store.NewTransaction(ctx, storage.WriteParams) + params := storage.WriteParams + params.Context = storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return @@ -1701,15 +1730,16 @@ func (s *Server) v1DataPut(w http.ResponseWriter, r *http.Request) { } _, err = s.store.Read(ctx, txn, path) - if err != nil { if !storage.IsNotFound(err) { s.abortAuto(ctx, txn, w, err) return } - if err := storage.MakeDir(ctx, s.store, txn, path[:len(path)-1]); err != nil { - s.abortAuto(ctx, txn, w, err) - return + if len(path) > 0 { + if err := storage.MakeDir(ctx, s.store, txn, path[:len(path)-1]); err != nil { + s.abortAuto(ctx, txn, w, err) + return + } } } else if r.Header.Get("If-None-Match") == "*" { s.store.Abort(ctx, txn) @@ -1733,12 +1763,25 @@ func (s *Server) v1DataPut(w http.ResponseWriter, r *http.Request) { return } + if includeMetrics { + result := types.DataResponseV1{ + Metrics: m.All(), + } + writer.JSON(w, http.StatusOK, result, false) + return + } + writer.Bytes(w, http.StatusNoContent, nil) } func (s *Server) v1DataDelete(w http.ResponseWriter, r *http.Request) { + m := metrics.New() + m.Timer(metrics.ServerHandler).Start() + defer m.Timer(metrics.ServerHandler).Stop() + ctx := r.Context() vars := mux.Vars(r) + includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true) path, ok := storage.ParsePathEscaped("/" + strings.Trim(vars["path"], "/")) if !ok { @@ -1746,7 +1789,9 @@ func (s *Server) v1DataDelete(w http.ResponseWriter, r *http.Request) { return } - txn, err := s.store.NewTransaction(ctx, storage.WriteParams) + params := storage.WriteParams + params.Context = storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return @@ -1773,6 +1818,14 @@ func (s *Server) v1DataDelete(w http.ResponseWriter, r *http.Request) { return } + if includeMetrics { + result := types.DataResponseV1{ + Metrics: m.All(), + } + writer.JSON(w, http.StatusOK, result, false) + return + } + writer.Bytes(w, http.StatusNoContent, nil) } @@ -1780,7 +1833,7 @@ func (s *Server) v1PoliciesDelete(w http.ResponseWriter, r *http.Request) { ctx := r.Context() vars := mux.Vars(r) pretty := getBoolParam(r.URL, types.ParamPrettyV1, true) - includeMetrics := getBoolParam(r.URL, types.ParamPrettyV1, true) + includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true) id, err := url.PathUnescape(vars["path"]) if err != nil { @@ -1789,8 +1842,9 @@ func (s *Server) v1PoliciesDelete(w http.ResponseWriter, r *http.Request) { } m := metrics.New() - - txn, err := s.store.NewTransaction(ctx, storage.WriteParams) + params := storage.WriteParams + params.Context = storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return @@ -1948,7 +2002,9 @@ func (s *Server) v1PoliciesPut(w http.ResponseWriter, r *http.Request) { m.Timer("server_read_bytes").Stop() - txn, err := s.store.NewTransaction(ctx, storage.WriteParams) + params := storage.WriteParams + params.Context = storage.NewContext().WithMetrics(m) + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return @@ -2071,7 +2127,8 @@ func (s *Server) v1QueryGet(w http.ResponseWriter, r *http.Request) { includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true) includeInstrumentation := getBoolParam(r.URL, types.ParamInstrumentV1, true) - txn, err := s.store.NewTransaction(ctx) + params := storage.TransactionParams{Context: storage.NewContext().WithMetrics(m)} + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return @@ -2140,7 +2197,8 @@ func (s *Server) v1QueryPost(w http.ResponseWriter, r *http.Request) { } } - txn, err := s.store.NewTransaction(ctx) + params := storage.TransactionParams{Context: storage.NewContext().WithMetrics(m)} + txn, err := s.store.NewTransaction(ctx, params) if err != nil { writer.ErrorAuto(w, err) return diff --git a/server/server_test.go b/server/server_test.go index 3596c141f5..d5cd2551b7 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -27,6 +27,7 @@ import ( "github.com/open-policy-agent/opa/bundle" "github.com/open-policy-agent/opa/config" "github.com/open-policy-agent/opa/internal/distributedtracing" + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/metrics" "github.com/open-policy-agent/opa/plugins" pluginBundle "github.com/open-policy-agent/opa/plugins/bundle" @@ -36,8 +37,10 @@ import ( "github.com/open-policy-agent/opa/server/types" "github.com/open-policy-agent/opa/server/writer" "github.com/open-policy-agent/opa/storage" + "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/storage/inmem" "github.com/open-policy-agent/opa/util" + "github.com/open-policy-agent/opa/util/test" "github.com/open-policy-agent/opa/version" ) @@ -991,35 +994,48 @@ func TestCompileV1(t *testing.T) { } func TestCompileV1Observability(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + test.WithTempFS(nil, func(root string) { + disk, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{Dir: root}) + if err != nil { + t.Fatal(err) + } + f := newFixtureWithStore(t, disk) - f := newFixture(t) - - err := f.v1(http.MethodPut, "/policies/test", `package test + err = f.v1(http.MethodPut, "/policies/test", `package test p { input.x = 1 }`, 200, "") - if err != nil { - t.Fatal(err) - } + if err != nil { + t.Fatal(err) + } - compileReq := newReqV1(http.MethodPost, "/compile?metrics&explain=full", `{ + compileReq := newReqV1(http.MethodPost, "/compile?metrics&explain=full", `{ "query": "data.test.p = true" }`) - f.reset() - f.server.Handler.ServeHTTP(f.recorder, compileReq) + f.reset() + f.server.Handler.ServeHTTP(f.recorder, compileReq) - var response types.CompileResponseV1 - if err := json.NewDecoder(f.recorder.Body).Decode(&response); err != nil { - t.Fatal(err) - } + var response types.CompileResponseV1 + if err := json.NewDecoder(f.recorder.Body).Decode(&response); err != nil { + t.Fatal(err) + } - if len(response.Explanation) == 0 { - t.Fatal("Expected non-empty explanation") - } + if len(response.Explanation) == 0 { + t.Fatal("Expected non-empty explanation") + } - if _, ok := response.Metrics["timer_rego_partial_eval_ns"]; !ok { - t.Fatal("Expected partial evaluation latency") - } + assertMetricsExist(t, response.Metrics, []string{ + "timer_rego_partial_eval_ns", + "timer_rego_query_compile_ns", + "timer_rego_query_parse_ns", + "timer_server_handler_ns", + "counter_disk_read_keys", + "counter_disk_read_bytes", + "timer_disk_read_ns", + }) + }) } func TestCompileV1UnsafeBuiltin(t *testing.T) { @@ -1175,13 +1191,23 @@ p = true { false }` }}, {"patch root", []tr{ {http.MethodPatch, "/data", `[ - {"op": "add", - "path": "/", - "value": {"a": 1, "b": 2} + { + "op": "add", + "path": "/", + "value": {"a": 1, "b": 2} } ]`, 204, ""}, {http.MethodGet, "/data", "", 200, `{"result": {"a": 1, "b": 2}}`}, }}, + {"patch root invalid", []tr{ + {http.MethodPatch, "/data", `[ + { + "op": "add", + "path": "/", + "value": [1,2,3] + } + ]`, 400, ""}, + }}, {"patch invalid", []tr{ {http.MethodPatch, "/data", `[ { @@ -1547,11 +1573,63 @@ p = true { false }` for _, tc := range tests { t.Run(tc.note, func(t *testing.T) { - executeRequests(t, tc.reqs) + test.WithTempFS(nil, func(root string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + disk, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{Dir: root}) + if err != nil { + t.Fatal(err) + } + executeRequests(t, tc.reqs, + variant{"inmem", nil}, + variant{"disk", []func(*Server){ + func(s *Server) { + s.WithStore(disk) + }, + }}, + ) + }) }) } } +func TestDataV1Metrics(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + test.WithTempFS(nil, func(root string) { + disk, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{Dir: root}) + if err != nil { + t.Fatal(err) + } + + f := newFixtureWithStore(t, disk) + put := newReqV1(http.MethodPut, `/data?metrics`, `{"foo":"bar"}`) + f.server.Handler.ServeHTTP(f.recorder, put) + + if f.recorder.Code != 200 { + t.Fatalf("Expected success but got %v", f.recorder) + } + + var result types.DataResponseV1 + err = util.UnmarshalJSON(f.recorder.Body.Bytes(), &result) + if err != nil { + t.Fatalf("Unexpected error while unmarshalling result: %v", err) + } + + assertMetricsExist(t, result.Metrics, []string{ + "counter_disk_read_keys", + "counter_disk_deleted_keys", + "counter_disk_written_keys", + "counter_disk_read_bytes", + "timer_rego_input_parse_ns", + "timer_server_handler_ns", + "timer_disk_read_ns", + "timer_disk_write_ns", + "timer_disk_commit_ns", + }) + }) +} + func TestConfigV1(t *testing.T) { f := newFixture(t) @@ -1681,105 +1759,119 @@ func TestDataPutV1IfNoneMatch(t *testing.T) { func TestBundleScope(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + test.WithTempFS(nil, func(root string) { + disk, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{Dir: root}) + if err != nil { + t.Fatal(err) + } - f := newFixture(t) + for _, v := range []variant{ + {"inmem", nil}, + {"disk", []func(*Server){func(s *Server) { s.WithStore(disk) }}}, + } { + t.Run(v.name, func(t *testing.T) { + f := newFixture(t, v.opts...) - txn := storage.NewTransactionOrDie(ctx, f.server.store, storage.WriteParams) + txn := storage.NewTransactionOrDie(ctx, f.server.store, storage.WriteParams) - if err := bundle.WriteManifestToStore(ctx, f.server.store, txn, "test-bundle", bundle.Manifest{ - Revision: "AAAAA", - Roots: &[]string{"a/b/c", "x/y", "foobar"}, - }); err != nil { - t.Fatal(err) - } + if err := bundle.WriteManifestToStore(ctx, f.server.store, txn, "test-bundle", bundle.Manifest{ + Revision: "AAAAA", + Roots: &[]string{"a/b/c", "x/y", "foobar"}, + }); err != nil { + t.Fatal(err) + } - if err := f.server.store.UpsertPolicy(ctx, txn, "someid", []byte(`package x.y.z`)); err != nil { - t.Fatal(err) - } + if err := f.server.store.UpsertPolicy(ctx, txn, "someid", []byte(`package x.y.z`)); err != nil { + t.Fatal(err) + } - if err := f.server.store.Commit(ctx, txn); err != nil { - t.Fatal(err) - } + if err := f.server.store.Commit(ctx, txn); err != nil { + t.Fatal(err) + } - cases := []tr{ - { - method: "PUT", - path: "/data/a/b", - body: "1", - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "path a/b is owned by bundle \"test-bundle\""}`, - }, - { - method: "PUT", - path: "/data/a/b/c", - body: "1", - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "path a/b/c is owned by bundle \"test-bundle\""}`, - }, - { - method: "PUT", - path: "/data/a/b/c/d", - body: "1", - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "path a/b/c/d is owned by bundle \"test-bundle\""}`, - }, - { - method: "PUT", - path: "/data/a/b/d", - body: "1", - code: http.StatusNoContent, - }, - { - method: "PATCH", - path: "/data/a", - body: `[{"path": "/b/c", "op": "add", "value": 1}]`, - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "path a/b/c is owned by bundle \"test-bundle\""}`, - }, - { - method: "DELETE", - path: "/data/a", - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "path a is owned by bundle \"test-bundle\""}`, - }, - { - method: "PUT", - path: "/policies/test1", - body: `package a.b`, - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "path a/b is owned by bundle \"test-bundle\""}`, - }, - { - method: "DELETE", - path: "/policies/someid", - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "path x/y/z is owned by bundle \"test-bundle\""}`, - }, - { - method: "PUT", - path: "/data/foo/bar", - body: "1", - code: http.StatusNoContent, - }, - { - method: "PUT", - path: "/data/foo", - body: "1", - code: http.StatusNoContent, - }, - { - method: "PUT", - path: "/data", - body: `{"a": "b"}`, - code: http.StatusBadRequest, - resp: `{"code": "invalid_parameter", "message": "can't write to document root with bundle roots configured"}`, - }, - } + cases := []tr{ + { + method: "PUT", + path: "/data/a/b", + body: "1", + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "path a/b is owned by bundle \"test-bundle\""}`, + }, + { + method: "PUT", + path: "/data/a/b/c", + body: "1", + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "path a/b/c is owned by bundle \"test-bundle\""}`, + }, + { + method: "PUT", + path: "/data/a/b/c/d", + body: "1", + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "path a/b/c/d is owned by bundle \"test-bundle\""}`, + }, + { + method: "PUT", + path: "/data/a/b/d", + body: "1", + code: http.StatusNoContent, + }, + { + method: "PATCH", + path: "/data/a", + body: `[{"path": "/b/c", "op": "add", "value": 1}]`, + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "path a/b/c is owned by bundle \"test-bundle\""}`, + }, + { + method: "DELETE", + path: "/data/a", + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "path a is owned by bundle \"test-bundle\""}`, + }, + { + method: "PUT", + path: "/policies/test1", + body: `package a.b`, + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "path a/b is owned by bundle \"test-bundle\""}`, + }, + { + method: "DELETE", + path: "/policies/someid", + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "path x/y/z is owned by bundle \"test-bundle\""}`, + }, + { + method: "PUT", + path: "/data/foo/bar", + body: "1", + code: http.StatusNoContent, + }, + { + method: "PUT", + path: "/data/foo", + body: "1", + code: http.StatusNoContent, + }, + { + method: "PUT", + path: "/data", + body: `{"a": "b"}`, + code: http.StatusBadRequest, + resp: `{"code": "invalid_parameter", "message": "can't write to document root with bundle roots configured"}`, + }, + } - if err := f.v1TestRequests(cases); err != nil { - t.Fatal(err) - } + if err := f.v1TestRequests(cases); err != nil { + t.Fatal(err) + } + }) + } + }) } func TestBundleScopeMultiBundle(t *testing.T) { @@ -2304,51 +2396,76 @@ func TestDataProvenanceMultiBundle(t *testing.T) { } func TestDataMetricsEval(t *testing.T) { - f := newFixture(t) + // These tests all use the POST /v1/data API with ?metrics appended. + // We're setting up the disk store because that injects a few extra metrics, + // which storage/inmem does not. - // Make a request to evaluate `data` - testDataMetrics(t, f, "/data?metrics", []string{ - "counter_server_query_cache_hit", - "timer_rego_input_parse_ns", - "timer_rego_query_parse_ns", - "timer_rego_query_compile_ns", - "timer_rego_query_eval_ns", - "timer_server_handler_ns", - "timer_rego_external_resolve_ns", - }) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + test.WithTempFS(nil, func(root string) { + disk, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{Dir: root}) + if err != nil { + t.Fatal(err) + } - // Repeat previous request, expect to have hit the query cache - // so fewer timers should have been reported. - testDataMetrics(t, f, "/data?metrics", []string{ - "counter_server_query_cache_hit", - "timer_rego_input_parse_ns", - "timer_rego_query_eval_ns", - "timer_server_handler_ns", - "timer_rego_external_resolve_ns", - }) + f := newFixtureWithStore(t, disk) + + // Make a request to evaluate `data` + testDataMetrics(t, f, "/data?metrics", []string{ + "counter_server_query_cache_hit", + "counter_disk_read_keys", + "counter_disk_read_bytes", + "timer_rego_input_parse_ns", + "timer_rego_query_parse_ns", + "timer_rego_query_compile_ns", + "timer_rego_query_eval_ns", + "timer_server_handler_ns", + "timer_disk_read_ns", + "timer_rego_external_resolve_ns", + }) - // Make a request to evaluate `data` and use partial evaluation, - // this should not hit the same query cache result as the previous - // request. - testDataMetrics(t, f, "/data?metrics&partial", []string{ - "counter_server_query_cache_hit", - "timer_rego_input_parse_ns", - "timer_rego_module_compile_ns", - "timer_rego_query_parse_ns", - "timer_rego_query_compile_ns", - "timer_rego_query_eval_ns", - "timer_rego_partial_eval_ns", - "timer_server_handler_ns", - "timer_rego_external_resolve_ns", - }) + // Repeat previous request, expect to have hit the query cache + // so fewer timers should have been reported. + testDataMetrics(t, f, "/data?metrics", []string{ + "counter_server_query_cache_hit", + "counter_disk_read_keys", + "counter_disk_read_bytes", + "timer_rego_input_parse_ns", + "timer_rego_query_eval_ns", + "timer_server_handler_ns", + "timer_disk_read_ns", + "timer_rego_external_resolve_ns", + }) + + // Make a request to evaluate `data` and use partial evaluation, + // this should not hit the same query cache result as the previous + // request. + testDataMetrics(t, f, "/data?metrics&partial", []string{ + "counter_server_query_cache_hit", + "counter_disk_read_keys", + "counter_disk_read_bytes", + "timer_rego_input_parse_ns", + "timer_rego_module_compile_ns", + "timer_rego_query_parse_ns", + "timer_rego_query_compile_ns", + "timer_rego_query_eval_ns", + "timer_rego_partial_eval_ns", + "timer_server_handler_ns", + "timer_disk_read_ns", + "timer_rego_external_resolve_ns", + }) - // Repeat previous partial eval request, this time it should - // be cached - testDataMetrics(t, f, "/data?metrics&partial", []string{ - "counter_server_query_cache_hit", - "timer_rego_input_parse_ns", - "timer_rego_query_eval_ns", - "timer_server_handler_ns", + // Repeat previous partial eval request, this time it should + // be cached + testDataMetrics(t, f, "/data?metrics&partial", []string{ + "counter_server_query_cache_hit", + "counter_disk_read_keys", + "counter_disk_read_bytes", + "timer_rego_input_parse_ns", + "timer_rego_query_eval_ns", + "timer_server_handler_ns", + "timer_disk_read_ns", + }) }) } @@ -2363,9 +2480,14 @@ func testDataMetrics(t *testing.T, f *fixture, url string, expected []string) { if err := util.NewJSONDecoder(f.recorder.Body).Decode(&result); err != nil { t.Fatalf("Unexpected JSON decode error: %v", err) } + assertMetricsExist(t, result.Metrics, expected) +} + +func assertMetricsExist(t *testing.T, metrics types.MetricsV1, expected []string) { + t.Helper() for _, key := range expected { - v, ok := result.Metrics[key] + v, ok := metrics[key] if !ok { t.Errorf("Missing expected metric: %s", key) } else if v == nil { @@ -2374,8 +2496,8 @@ func testDataMetrics(t *testing.T, f *fixture, url string, expected []string) { } - if len(expected) != len(result.Metrics) { - t.Errorf("Expected %d metrics, got %d\n\n\tValues: %+v", len(expected), len(result.Metrics), result.Metrics) + if len(expected) != len(metrics) { + t.Errorf("Expected %d metrics, got %d\n\n\tValues: %+v", len(expected), len(metrics), metrics) } } @@ -3139,31 +3261,50 @@ func TestDecisionLogErrorMessage(t *testing.T) { } func TestQueryV1(t *testing.T) { - f := newFixture(t) - get := newReqV1(http.MethodGet, `/query?q=a=[1,2,3]%3Ba[i]=x`, "") - f.server.Handler.ServeHTTP(f.recorder, get) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + test.WithTempFS(nil, func(root string) { + disk, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{Dir: root}) + if err != nil { + t.Fatal(err) + } - if f.recorder.Code != 200 { - t.Fatalf("Expected success but got %v", f.recorder) - } + f := newFixtureWithStore(t, disk) + get := newReqV1(http.MethodGet, `/query?q=a=[1,2,3]%3Ba[i]=x&metrics`, "") + f.server.Handler.ServeHTTP(f.recorder, get) - var expected types.QueryResponseV1 - err := util.UnmarshalJSON([]byte(`{ + if f.recorder.Code != 200 { + t.Fatalf("Expected success but got %v", f.recorder) + } + + var expected types.QueryResponseV1 + err = util.UnmarshalJSON([]byte(`{ "result": [{"a":[1,2,3],"i":0,"x":1},{"a":[1,2,3],"i":1,"x":2},{"a":[1,2,3],"i":2,"x":3}] }`), &expected) - if err != nil { - panic(err) - } + if err != nil { + panic(err) + } - var result types.QueryResponseV1 - err = util.UnmarshalJSON(f.recorder.Body.Bytes(), &result) - if err != nil { - t.Fatalf("Unexpected error while unmarshalling result: %v", err) - } + var result types.QueryResponseV1 + err = util.UnmarshalJSON(f.recorder.Body.Bytes(), &result) + if err != nil { + t.Fatalf("Unexpected error while unmarshalling result: %v", err) + } - if !reflect.DeepEqual(result, expected) { - t.Fatalf("Expected %v but got: %v", expected, result) - } + assertMetricsExist(t, result.Metrics, []string{ + "counter_disk_read_keys", + "counter_disk_read_bytes", + "timer_rego_query_compile_ns", + "timer_rego_query_eval_ns", + // "timer_server_handler_ns", // TODO(sr): we're not consistent about timing this? + "timer_disk_read_ns", + }) + + result.Metrics = nil + if !reflect.DeepEqual(result, expected) { + t.Fatalf("Expected %v but got: %v", expected, result) + } + }) } func TestBadQueryV1(t *testing.T) { @@ -3620,26 +3761,24 @@ type fixture struct { func newFixture(t *testing.T, opts ...func(*Server)) *fixture { ctx := context.Background() - store := inmem.New() - m, err := plugins.New([]byte{}, "test", store) - if err != nil { - panic(err) - } - - if err := m.Start(ctx); err != nil { - panic(err) - } - server := New(). WithAddresses([]string{"localhost:8182"}). - WithStore(store). - WithManager(m) + WithStore(inmem.New()) // potentially overridden via opts for _, opt := range opts { opt(server) } + + m, err := plugins.New([]byte{}, "test", server.store) + if err != nil { + t.Fatal(err) + } + server = server.WithManager(m) + if err := m.Start(ctx); err != nil { + t.Fatal(err) + } server, err = server.Init(ctx) if err != nil { - panic(err) + t.Fatal(err) } recorder := httptest.NewRecorder() @@ -3684,7 +3823,7 @@ func newFixtureWithStore(t *testing.T, store storage.Store, opts ...func(*Server func (f *fixture) v1TestRequests(trs []tr) error { for i, tr := range trs { if err := f.v1(tr.method, tr.path, tr.body, tr.code, tr.resp); err != nil { - return errors.Wrapf(err, "error on test request #%d", i+1) + return fmt.Errorf("error on test request #%d: %w", i+1, err) } } return nil @@ -3750,13 +3889,22 @@ func (f *fixture) reset() { f.recorder = httptest.NewRecorder() } -func executeRequests(t *testing.T, reqs []tr) { +type variant struct { + name string + opts []func(*Server) +} + +func executeRequests(t *testing.T, reqs []tr, variants ...variant) { t.Helper() - f := newFixture(t) - for i, req := range reqs { - if err := f.v1(req.method, req.path, req.body, req.code, req.resp); err != nil { - t.Errorf("Unexpected response on request %d: %v", i+1, err) - } + for _, v := range variants { + t.Run(v.name, func(t *testing.T) { + f := newFixture(t, v.opts...) + for i, req := range reqs { + if err := f.v1(req.method, req.path, req.body, req.code, req.resp); err != nil { + t.Errorf("Unexpected response on request %d: %v", i+1, err) + } + } + }) } } @@ -4145,17 +4293,17 @@ func TestDistributedTracingDisabled(t *testing.T) { type mockHTTPHandler struct{} -func (m *mockHTTPHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { +func (*mockHTTPHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) } type mockMetricsProvider struct{} -func (m *mockMetricsProvider) RegisterEndpoints(registrar func(path, method string, handler http.Handler)) { +func (*mockMetricsProvider) RegisterEndpoints(registrar func(string, string, http.Handler)) { registrar("/metrics", "GET", new(mockHTTPHandler)) } -func (m *mockMetricsProvider) InstrumentHandler(handler http.Handler, label string) http.Handler { +func (*mockMetricsProvider) InstrumentHandler(handler http.Handler, _ string) http.Handler { return handler } @@ -4167,21 +4315,19 @@ type mockHTTPListener struct { t httpListenerType } -var _ httpListener = (*mockHTTPListener)(nil) - func (m mockHTTPListener) Addr() string { return m.addrs } -func (m mockHTTPListener) ListenAndServe() error { +func (mockHTTPListener) ListenAndServe() error { return errors.New("not implemented") } -func (m mockHTTPListener) ListenAndServeTLS(certFile, keyFile string) error { +func (mockHTTPListener) ListenAndServeTLS(string, string) error { return errors.New("not implemented") } -func (m mockHTTPListener) Shutdown(ctx context.Context) error { +func (m mockHTTPListener) Shutdown(context.Context) error { var err error if m.shutdownHook != nil { err = m.shutdownHook() diff --git a/storage/disk/config.go b/storage/disk/config.go new file mode 100644 index 0000000000..e1019dbde6 --- /dev/null +++ b/storage/disk/config.go @@ -0,0 +1,75 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package disk + +import ( + "errors" + "fmt" + "os" + + badger "github.com/dgraph-io/badger/v3" + "github.com/open-policy-agent/opa/config" + "github.com/open-policy-agent/opa/storage" + "github.com/open-policy-agent/opa/util" +) + +type cfg struct { + Dir string `json:"directory"` + AutoCreate bool `json:"auto_create"` + Partitions []string `json:"partitions"` + Badger string `json:"badger"` +} + +var ErrInvalidPartitionPath = errors.New("invalid storage path") + +// OptionsFromConfig parses the passed config, extracts the disk storage +// settings, validates it, and returns a *Options struct pointer on success. +func OptionsFromConfig(raw []byte, id string) (*Options, error) { + parsedConfig, err := config.ParseConfig(raw, id) + if err != nil { + return nil, err + } + + if parsedConfig.Storage == nil || len(parsedConfig.Storage.Disk) == 0 { + return nil, nil + } + + var c cfg + if err := util.Unmarshal(parsedConfig.Storage.Disk, &c); err != nil { + return nil, err + } + + if _, err := os.Stat(c.Dir); err != nil { + if os.IsNotExist(err) && c.AutoCreate { + err = os.MkdirAll(c.Dir, 0700) // overwrite err + } + if err != nil { + return nil, fmt.Errorf("directory %v invalid: %w", c.Dir, err) + } + } + + opts := Options{ + Dir: c.Dir, + Badger: c.Badger, + } + for _, path := range c.Partitions { + p, ok := storage.ParsePath(path) + if !ok { + return nil, fmt.Errorf("partition path '%v': %w", path, ErrInvalidPartitionPath) + } + opts.Partitions = append(opts.Partitions, p) + } + + return &opts, nil +} + +func badgerConfigFromOptions(opts Options) badger.Options { + // Set some things _after_ FromSuperFlag to prohibit overriding them + return badger.DefaultOptions(""). + FromSuperFlag(opts.Badger). + WithDir(dataDir(opts.Dir)). + WithValueDir(dataDir(opts.Dir)). + WithDetectConflicts(false) // We only allow one write txn at a time; so conflicts cannot happen. +} diff --git a/storage/disk/config_test.go b/storage/disk/config_test.go new file mode 100644 index 0000000000..7811c123bb --- /dev/null +++ b/storage/disk/config_test.go @@ -0,0 +1,238 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package disk + +import ( + "context" + "errors" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/dgraph-io/badger/v3" + "github.com/open-policy-agent/opa/logging" +) + +func TestNewFromConfig(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "disk_test") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { os.RemoveAll(tmpdir) }) + + for _, tc := range []struct { + note string + config string + err error // gets unwrapped + nothing bool // returns no disk options? + }{ + { + note: "no storage section", + config: "", + nothing: true, + }, + { + note: "successful init, no partitions", + config: ` +storage: + disk: + directory: "` + tmpdir + `" +`, + }, + { + note: "successful init, valid partitions", + config: ` +storage: + disk: + directory: "` + tmpdir + `" + partitions: + - /foo/bar + - /baz +`, + }, + { + note: "partitions invalid", + config: ` +storage: + disk: + directory: "` + tmpdir + `" + partitions: + - /foo/bar + - baz +`, + err: ErrInvalidPartitionPath, + }, + { + note: "directory does not exist", + config: ` +storage: + disk: + directory: "` + tmpdir + `/foobar" +`, + err: os.ErrNotExist, + }, + { + note: "auto-create directory, does not exist", + config: ` +storage: + disk: + auto_create: true + directory: "` + tmpdir + `/foobar" +`, + }, + { + note: "auto-create directory, does already exist", // could be the second run + config: ` +storage: + disk: + auto_create: true + directory: "` + tmpdir + `" +`, + }, + } { + t.Run(tc.note, func(t *testing.T) { + d, err := OptionsFromConfig([]byte(tc.config), "id") + if !errors.Is(err, tc.err) { + t.Errorf("err: expected %v, got %v", tc.err, err) + } + if tc.nothing && d != nil { + t.Errorf("expected no disk options, got %v", d) + } + }) + } +} + +func TestDataDirPrefix(t *testing.T) { + ctx := context.Background() + tmpdir, err := ioutil.TempDir("", "disk_test") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { os.RemoveAll(tmpdir) }) + + d, err := New(ctx, logging.NewNoOpLogger(), nil, Options{ + Dir: tmpdir, + }) + if err != nil { + t.Fatal(err) + } + d.Close(ctx) + + dir := filepath.Join(tmpdir, "data") + if _, err := os.Stat(dir); err != nil { + t.Fatalf("stat %v: %v", dir, err) + } + files, err := os.ReadDir(tmpdir) + if err != nil { + t.Fatal(err) + } + + // We currently only expect a single directory here: "data" + for _, file := range files { + if file.Name() != "data" { + t.Errorf("unexpected file in dir: %v", file.Name()) + } + } +} + +func TestBadgerConfigFromOptions(t *testing.T) { + type check func(*testing.T, badger.Options) + checks := func(c ...check) []check { + return c + } + valueDir := func(exp string) check { + return func(t *testing.T, o badger.Options) { + if act := o.ValueDir; act != exp { + t.Errorf("ValueDir: expected %v, got %v", exp, act) + } + } + } + dir := func(exp string) check { + return func(t *testing.T, o badger.Options) { + if act := o.Dir; act != exp { + t.Errorf("Dir: expected %v, got %v", exp, act) + } + } + } + conflict := func(exp bool) check { + return func(t *testing.T, o badger.Options) { + if act := o.DetectConflicts; act != exp { + t.Errorf("DetectConflicts: expected %v, got %v", exp, act) + } + } + } + nummemtables := func(exp int) check { + return func(t *testing.T, o badger.Options) { + if act := o.NumMemtables; act != exp { + t.Errorf("Dir: expected %v, got %v", exp, act) + } + } + } + numversionstokeep := func(exp int) check { + return func(t *testing.T, o badger.Options) { + if act := o.NumVersionsToKeep; act != exp { + t.Errorf("Dir: expected %v, got %v", exp, act) + } + } + } + + tests := []struct { + note string + opts Options + checks []check + }{ + { + "defaults", + Options{ + Dir: "foo", + }, + checks( + valueDir("foo/data"), + dir("foo/data"), + conflict(false), + ), + }, + { + "valuedir+dir override", + Options{ + Dir: "foo", + Badger: `valuedir="baz"; dir="quz"`, + }, + checks( + valueDir("foo/data"), + dir("foo/data"), + ), + }, + { + "conflict detection override", + Options{ + Dir: "foo", + Badger: `detectconflicts=true`, + }, + checks(conflict(false)), + }, + { + "two valid overrides", // NOTE(sr): This is just one example + Options{ + Dir: "foo", + Badger: `nummemtables=123; numversionstokeep=123`, + }, + checks( + nummemtables(123), + numversionstokeep(123), + ), + }, + } + + for _, tc := range tests { + t.Run(tc.note, func(t *testing.T) { + act := badgerConfigFromOptions(tc.opts) + for _, check := range tc.checks { + check(t, act) + } + }) + } +} diff --git a/storage/disk/disk.go b/storage/disk/disk.go index 348499173a..9e48c36bc2 100644 --- a/storage/disk/disk.go +++ b/storage/disk/disk.go @@ -26,6 +26,10 @@ // to a single key). Similarly, values that fall outside of partitions are // stored under individual keys at the root (e.g., the full extent of the value // at /qux would be stored under one key.) +// There is support for wildcards in partitions: {/foo/*} will cause /foo/bar/abc +// and /foo/buz/def to be written to separate keys. Multiple wildcards are +// supported (/tenants/*/users/*/bindings), and they can also appear at the end +// of a partition (/users/*). // // All keys written by the disk.Store implementation are prefixed as follows: // @@ -57,32 +61,45 @@ import ( "context" "encoding/json" "fmt" + "path/filepath" + "strings" "sync" "sync/atomic" + "time" badger "github.com/dgraph-io/badger/v3" + "github.com/prometheus/client_golang/prometheus" + + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/storage" "github.com/open-policy-agent/opa/util" ) -// TODO(tsandall): deal w/ slashes in paths -// TODO(tsandall): support multi-level partitioning for use cases like k8s // TODO(tsandall): add support for migrations +// TODO(sr): validate partition patterns properly: if the partitions were {/foo/bar} +// before and the new ones are {/foo/*}, it should be OK. + +// a value log file be rewritten if half the space can be discarded +const valueLogGCDiscardRatio = 0.5 // Options contains parameters that configure the disk-based store. type Options struct { Dir string // specifies directory to store data inside of Partitions []storage.Path // data prefixes that enable efficient layout + Badger string // badger-internal configurables } // Store provides a disk-based implementation of the storage.Store interface. type Store struct { db *badger.DB // underlying key-value store xid uint64 // next transaction id - mu sync.Mutex // synchronizes trigger execution + rmu sync.RWMutex // reader-writer lock + wmu sync.Mutex // writer lock pm *pathMapper // maps logical storage paths to underlying store keys partitions *partitionTrie // data structure to support path mapping triggers map[*handle]struct{} // registered triggers + gcTicker *time.Ticker // gc ticker + close chan struct{} // close-only channel for stopping the GC goroutine } const ( @@ -105,7 +122,7 @@ type metadata struct { } // New returns a new disk-based store based on the provided options. -func New(ctx context.Context, opts Options) (*Store, error) { +func New(ctx context.Context, logger logging.Logger, prom prometheus.Registerer, opts Options) (*Store, error) { partitions := make(pathSet, len(opts.Partitions)) copy(partitions, opts.Partitions) @@ -118,27 +135,73 @@ func New(ctx context.Context, opts Options) (*Store, error) { } } - db, err := badger.Open(badger.DefaultOptions(opts.Dir).WithLogger(nil)) + db, err := badger.Open(badgerConfigFromOptions(opts).WithLogger(&wrap{logger})) if err != nil { return nil, wrapError(err) } + if prom != nil { + if err := initPrometheus(prom); err != nil { + return nil, err + } + } + store := &Store{ db: db, partitions: buildPartitionTrie(partitions), triggers: map[*handle]struct{}{}, + close: make(chan struct{}), + gcTicker: time.NewTicker(time.Minute), } - return store, db.Update(func(txn *badger.Txn) error { + go store.GC(logger) + + if err := db.Update(func(txn *badger.Txn) error { return store.init(ctx, txn, partitions) - }) + }); err != nil { + store.Close(ctx) + return nil, err + } + + return store, store.diagnostics(ctx, partitions, logger) +} + +func (db *Store) GC(logger logging.Logger) { + for { + select { + case <-db.close: + return + case <-db.gcTicker.C: + for err := error(nil); err == nil; err = db.db.RunValueLogGC(valueLogGCDiscardRatio) { + logger.Debug("RunValueLogGC: err=%v", err) + } + } + } } // Close finishes the DB connection and allows other processes to acquire it. func (db *Store) Close(context.Context) error { + db.gcTicker.Stop() return wrapError(db.db.Close()) } +// If the log level is debug, we'll output the badger logs in their corresponding +// log levels; if it's not debug, we'll suppress all badger logs. +type wrap struct { + l logging.Logger +} + +func (w *wrap) debugDo(f func(string, ...interface{}), fmt string, as ...interface{}) { + if w.l.GetLevel() >= logging.Debug { + f("badger: "+fmt, as...) + } +} + +func (w *wrap) Debugf(f string, as ...interface{}) { w.debugDo(w.l.Debug, f, as...) } +func (w *wrap) Infof(f string, as ...interface{}) { w.debugDo(w.l.Info, f, as...) } +func (w *wrap) Warningf(f string, as ...interface{}) { w.debugDo(w.l.Warn, f, as...) } +func (w *wrap) Errorf(f string, as ...interface{}) { w.debugDo(w.l.Error, f, as...) } + // NewTransaction implements the storage.Store interface. func (db *Store) NewTransaction(ctx context.Context, params ...storage.TransactionParams) (storage.Transaction, error) { var write bool @@ -150,6 +213,11 @@ func (db *Store) NewTransaction(ctx context.Context, params ...storage.Transacti } xid := atomic.AddUint64(&db.xid, uint64(1)) + if write { + db.wmu.Lock() // only one concurrent write txn + } else { + db.rmu.RLock() + } underlying := db.db.NewTransaction(write) return newTransaction(xid, write, underlying, context, db.pm, db.partitions, db), nil @@ -162,17 +230,23 @@ func (db *Store) Commit(ctx context.Context, txn storage.Transaction) error { return err } if underlying.write { + db.rmu.Lock() // blocks until all readers are done event, err := underlying.Commit(ctx) if err != nil { return err } - db.mu.Lock() - defer db.mu.Unlock() + write := false // read only txn + readOnly := db.db.NewTransaction(write) + xid := atomic.AddUint64(&db.xid, uint64(1)) + readTxn := newTransaction(xid, write, readOnly, nil, db.pm, db.partitions, db) for h := range db.triggers { - h.cb(ctx, txn, event) + h.cb(ctx, readTxn, event) } - } else { + db.rmu.Unlock() + db.wmu.Unlock() + } else { // committing read txn underlying.Abort(ctx) + db.rmu.RUnlock() } return nil } @@ -184,6 +258,11 @@ func (db *Store) Abort(ctx context.Context, txn storage.Transaction) { panic(err) } underlying.Abort(ctx) + if underlying.write { + db.wmu.Unlock() + } else { + db.rmu.RUnlock() + } } // ListPolicies implements the storage.Policy interface. @@ -238,8 +317,6 @@ func (db *Store) Register(_ context.Context, txn storage.Transaction, config sto } } h := &handle{db: db, cb: config.OnCommit} - db.mu.Lock() - defer db.mu.Unlock() db.triggers[h] = struct{}{} return h, nil } @@ -305,9 +382,7 @@ func (h *handle) Unregister(ctx context.Context, txn storage.Transaction) { Message: "triggers must be unregistered with a write transaction", }) } - h.db.mu.Lock() delete(h.db.triggers, h) - h.db.mu.Unlock() } func (db *Store) loadMetadata(txn *badger.Txn, m *metadata) (bool, error) { @@ -424,3 +499,163 @@ func (db *Store) validatePartitions(ctx context.Context, txn *badger.Txn, existi return nil } + +// MakeDir makes Store a storage.MakeDirer, to avoid the superfluous MakeDir +// steps -- MakeDir is implicit in the disk storage's data layout, since +// {"foo": {"bar": {"baz": 10}}} +// writes value `10` to key `/foo/bar/baz`. +// +// Here, we only check if it's a write transaction, for consistency with +// other implementations, and do nothing. +func (db *Store) MakeDir(_ context.Context, txn storage.Transaction, path storage.Path) error { + underlying, err := db.underlying(txn) + if err != nil { + return err + } + if !underlying.write { + return &storage.Error{ + Code: storage.InvalidTransactionErr, + Message: "MakeDir must be called with a write transaction", + } + } + return nil +} + +// diagnostics prints relevant partition and database related information at +// debug level. +func (db *Store) diagnostics(ctx context.Context, partitions pathSet, logger logging.Logger) error { + if logger.GetLevel() < logging.Debug { + return nil + } + if len(partitions) == 0 { + logger.Warn("no partitions configured") + if err := db.logPrefixStatistics(ctx, storage.MustParsePath("/"), logger); err != nil { + return err + } + } + for _, partition := range partitions { + if err := db.logPrefixStatistics(ctx, partition, logger); err != nil { + return err + } + } + return nil +} + +func (db *Store) logPrefixStatistics(ctx context.Context, partition storage.Path, logger logging.Logger) error { + + if prefix, ok := hasWildcard(partition); ok { + return db.logPrefixStatisticsWildcardPartition(ctx, prefix, partition, logger) + } + + key, err := db.pm.DataPrefix2Key(partition) + if err != nil { + return err + } + + opt := badger.DefaultIteratorOptions + opt.PrefetchValues = false + opt.Prefix = key + + var count, size uint64 + if err := db.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(opt) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + if err := ctx.Err(); err != nil { + return err + } + count++ + size += uint64(it.Item().EstimatedSize()) // key length + value length + } + return nil + }); err != nil { + return err + } + logger.Debug("partition %s: key count: %d (estimated size %d bytes)", partition, count, size) + return nil +} + +func hasWildcard(path storage.Path) (storage.Path, bool) { + for i := range path { + if path[i] == pathWildcard { + return path[:i], true + } + } + return nil, false +} + +func (db *Store) logPrefixStatisticsWildcardPartition(ctx context.Context, prefix, partition storage.Path, logger logging.Logger) error { + // we iterate all keys, and count things according to their concrete partition + type diagInfo struct{ count, size uint64 } + diag := map[string]*diagInfo{} + + key, err := db.pm.DataPrefix2Key(prefix) + if err != nil { + return err + } + + opt := badger.DefaultIteratorOptions + opt.PrefetchValues = false + opt.Prefix = key + if err := db.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(opt) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + if err := ctx.Err(); err != nil { + return err + } + if part, ok := db.prefixInPattern(it.Item().Key(), partition); ok { + p := part.String() + if diag[p] == nil { + diag[p] = &diagInfo{} + } + diag[p].count++ + diag[p].size += uint64(it.Item().EstimatedSize()) // key length + value length + } + } + return nil + }); err != nil { + return err + } + if len(diag) == 0 { + logger.Debug("partition pattern %s: key count: 0 (estimated size 0 bytes)", toString(partition)) + } + for part, diag := range diag { + logger.Debug("partition %s (pattern %s): key count: %d (estimated size %d bytes)", part, toString(partition), diag.count, diag.size) + } + return nil +} + +func (db *Store) prefixInPattern(key []byte, partition storage.Path) (storage.Path, bool) { + var part storage.Path + path, err := db.pm.DataKey2Path(key) + if err != nil { + return nil, false + } + for i := range partition { + if path[i] != partition[i] && partition[i] != pathWildcard { + return nil, false + } + part = append(part, path[i]) + } + return part, true +} + +func toString(path storage.Path) string { + if len(path) == 0 { + return "/" + } + buf := strings.Builder{} + for _, p := range path { + fmt.Fprintf(&buf, "/%s", p) + } + return buf.String() +} + +// dataDir prefixes the configured storage location: what it returns is +// what we have badger write its files to. It is done to give us some +// wiggle room in the future should we need to put further files on the +// file system (like backups): we can then just use the opts.Dir. +func dataDir(dir string) string { + return filepath.Join(dir, "data") +} diff --git a/storage/disk/disk_test.go b/storage/disk/disk_test.go index 4a097fae2e..290878568a 100644 --- a/storage/disk/disk_test.go +++ b/storage/disk/disk_test.go @@ -7,9 +7,13 @@ package disk import ( "bytes" "context" + "reflect" "strings" "testing" + badger "github.com/dgraph-io/badger/v3" + + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/storage" "github.com/open-policy-agent/opa/util" "github.com/open-policy-agent/opa/util/test" @@ -32,11 +36,69 @@ type testWriteError struct { value string } +// testCount lets you assert the number of keys under a prefix. +// Note that we don't do exact matches, so the assertions should be +// as exact as possible: +// testCount{"/foo", 1} +// testCount{"/foo/bar", 1} +// both of these would be true for one element under key `/foo/bar`. +type testCount struct { + key string + count int +} + +func (tc *testCount) assert(t *testing.T, s *Store) { + t.Helper() + key, err := s.pm.DataPath2Key(storage.MustParsePath(tc.key)) + if err != nil { + t.Fatal(err) + } + + opt := badger.DefaultIteratorOptions + opt.PrefetchValues = false + opt.Prefix = key + + var count int + if err := s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(opt) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + count++ + } + return nil + }); err != nil { + t.Fatal(err) + } + + if tc.count != count { + t.Errorf("key %v: expected %d keys, found %d", tc.key, tc.count, count) + } +} + +type testDump struct{} // for debugging purposes + +func (*testDump) do(t *testing.T, s *Store) { + t.Helper() + opt := badger.DefaultIteratorOptions + opt.PrefetchValues = true + + if err := s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(opt) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + t.Logf("%v -> %v", string(it.Item().Key()), it.Item().ValueSize()) + } + return nil + }); err != nil { + t.Fatal(err) + } +} + func TestPolicies(t *testing.T) { test.WithTempFS(map[string]string{}, func(dir string) { ctx := context.Background() - s, err := New(ctx, Options{Dir: dir, Partitions: nil}) + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: nil}) if err != nil { t.Fatal(err) } @@ -101,6 +163,9 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn := func(ctx context.Context, s *Store) { t.Helper() + if s == nil { + return + } if err := s.Close(ctx); err != nil { t.Fatal(err) } @@ -110,7 +175,7 @@ func TestDataPartitioningValidation(t *testing.T) { ctx := context.Background() - _, err := New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + _, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/bar/baz"), }}) @@ -123,7 +188,7 @@ func TestDataPartitioningValidation(t *testing.T) { t.Fatal("unexpected code or message, got:", err) } - s, err := New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/baz"), }}) @@ -133,7 +198,7 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn(ctx, s) - s, err = New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/baz"), storage.MustParsePath("/foo/bar"), }}) @@ -143,7 +208,7 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn(ctx, s) - s, err = New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/baz"), storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/qux"), @@ -164,7 +229,7 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn(ctx, s) - s, err = New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/baz"), storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/qux/corge"), @@ -175,7 +240,7 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn(ctx, s) - s, err = New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/baz"), storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/qux"), @@ -187,7 +252,7 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn(ctx, s) - s, err = New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/baz"), storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/qux"), @@ -199,7 +264,7 @@ func TestDataPartitioningValidation(t *testing.T) { closeFn(ctx, s) - s, err = New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err = New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo/baz"), storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/qux"), @@ -233,6 +298,7 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { path: "/foo/bar", exp: `"x"`, }, + testCount{"/foo/bar", 1}, }, }, { @@ -248,6 +314,7 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { path: "/foo/bar/baz", exp: `"x"`, }, + testCount{"/foo/bar/baz", 1}, }, }, { @@ -263,6 +330,8 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { path: "/deadbeef", exp: `"x"`, }, + testCount{"/foo", 0}, + testCount{"/deadbeef", 1}, }, }, { @@ -302,6 +371,8 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { path: "/foo/bar/baz", exp: `8`, }, + testCount{"/foo/bar", 1}, + testCount{"/foo/bar/baz", 0}, }, }, { @@ -365,7 +436,27 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { }, }, { - note: "read-modify-write: add: array overwrite", + note: "read-modify-write: add: array append (via last index)", + partitions: []string{"/foo"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/foo/bar", + value: `[1]`, + }, + testWrite{ + op: storage.AddOp, + path: "/foo/bar/1", + value: `8`, + }, + testRead{ + path: "/foo/bar", + exp: `[1, 8]`, + }, + }, + }, + { + note: "read-modify-write: add: array insert", partitions: []string{"/foo"}, sequence: []interface{}{ testWrite{ @@ -380,7 +471,7 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { }, testRead{ path: "/foo/bar", - exp: `[8]`, + exp: `[8, 7]`, }, }, }, @@ -404,6 +495,26 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { }, }, }, + { + note: "read-modify-write: replace: array", + partitions: []string{"/foo"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/foo/bar", + value: `[7]`, + }, + testWrite{ + op: storage.ReplaceOp, + path: "/foo/bar/0", + value: `8`, + }, + testRead{ + path: "/foo/bar", + exp: `[8]`, + }, + }, + }, { note: "read-modify-write: remove", partitions: []string{"/foo"}, @@ -423,6 +534,25 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { }, }, }, + { + note: "read-modify-write: remove: array", + partitions: []string{"/foo"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/foo/bar", + value: `[7, 8]`, + }, + testWrite{ + op: storage.RemoveOp, + path: "/foo/bar/0", + }, + testRead{ + path: "/foo/bar", + exp: `[8]`, + }, + }, + }, { note: "read-modify-write: multi-level: map", partitions: []string{"/foo"}, @@ -472,6 +602,8 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { path: "/foo", value: `{"bar": 7, "baz": 8}`, }, + + testCount{"/foo", 2}, testRead{ path: "/foo/bar", exp: `7`, @@ -592,6 +724,87 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { }, }, }, + { + note: "pattern partitions: middle wildcard: match", + partitions: []string{"/foo/*/bar"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/foo/a/bar", + value: `{"baz": 7}`, + }, + testCount{"/foo/a/bar/baz", 1}, + testRead{"/foo/a/bar/baz", `7`}, + }, + }, + { + note: "pattern partitions: middle wildcard: no-match", + partitions: []string{"/foo/*/bar"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/foo/b/baz", + value: `{"quz": 1}`, + }, + testCount{"/foo/b/baz", 1}, + testCount{"/foo/b/baz/quz", 0}, + testRead{"/foo/b/baz/quz", `1`}, + }, + }, + { + note: "pattern partitions: middle wildcard: partial match", + partitions: []string{"/foo/*/bar"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/foo/b", + value: `{"bar": {"quz": 1}, "x": "y"}`, + }, + testCount{"/foo/b/bar/quz", 1}, + testRead{"/foo/b/bar/quz", `1`}, + testCount{"/foo/b/x", 1}, + testRead{"/foo/b/x", `"y"`}, + }, + }, + { + note: "pattern partitions: 2x middle wildcard: partial match", + partitions: []string{"/foo/*/*/bar"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/foo/b/c", + value: `{"bar": {"quz": 1}, "x": "y"}`, + }, + testCount{"/foo/b/c/bar/quz", 1}, + testRead{"/foo/b/c/bar/quz", `1`}, + testCount{"/foo/b/c/x", 1}, + testRead{"/foo/b/c/x", `"y"`}, + }, + }, + { + note: "pattern partitions: wildcard at the end", + partitions: []string{"/users/*"}, + sequence: []interface{}{ + testWrite{ + op: storage.AddOp, + path: "/users", + value: `{"alice": {"bar": {"quz": 1}, "x": "y"}}`, + }, + testWrite{ + op: storage.AddOp, + path: "/users/bob", + value: `{"baz": {"one": 1}, "y": "x"}`, + }, + testCount{"/users/alice/bar", 1}, + testRead{"/users/alice/bar/quz", `1`}, + testCount{"/users/alice/x", 1}, + testRead{"/users/alice/x", `"y"`}, + testCount{"/users/bob/baz", 1}, + testRead{"/users/bob/baz/one", `1`}, + testCount{"/users/bob/y", 1}, + testRead{"/users/bob/y", `"x"`}, + }, + }, } for _, tc := range tests { @@ -604,7 +817,7 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { } ctx := context.Background() - s, err := New(ctx, Options{Dir: dir, Partitions: partitions}) + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: partitions}) if err != nil { t.Fatal(err) } @@ -612,6 +825,8 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { for _, x := range tc.sequence { switch x := x.(type) { + case testCount: + x.assert(t, s) case testWrite: executeTestWrite(ctx, t, s, x) case testRead: @@ -626,6 +841,8 @@ func TestDataPartitioningReadsAndWrites(t *testing.T) { if cmp := util.Compare(result, exp); cmp != 0 { t.Fatalf("expected %v but got %v", x.exp, result) } + case testDump: + x.do(t, s) default: panic("unexpected type") } @@ -752,7 +969,7 @@ func TestDataPartitioningReadNotFoundErrors(t *testing.T) { } ctx := context.Background() - s, err := New(ctx, Options{Dir: dir, Partitions: partitions}) + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: partitions}) if err != nil { t.Fatal(err) } @@ -786,60 +1003,6 @@ func TestDataPartitioningWriteNotFoundErrors(t *testing.T) { partitions []string sequence []interface{} }{ - { - note: "unpartitioned", - partitions: []string{"/foo"}, - sequence: []interface{}{ - testWriteError{ - op: storage.AddOp, - path: "/does/notexist", - value: `7`, - }, - }, - }, - { - note: "unpartitioned: nested", - partitions: []string{"/foo"}, - sequence: []interface{}{ - testWrite{ - op: storage.AddOp, - path: "/deadbeef", - value: `{}`, - }, - testWriteError{ - op: storage.AddOp, - path: "/deadbeef/x/y", - value: `{}`, - }, - }, - }, - { - note: "partitioned: nested", - partitions: []string{"/foo"}, - sequence: []interface{}{ - testWriteError{ - op: storage.AddOp, - path: "/foo/bar/baz", - value: `{}`, - }, - }, - }, - { - note: "partitioned: nested: 2-level", - partitions: []string{"/foo"}, - sequence: []interface{}{ - testWrite{ - op: storage.AddOp, - path: "/foo/bar", - value: `{}`, - }, - testWriteError{ - op: storage.AddOp, - path: "/foo/bar/baz/qux", - value: `7`, - }, - }, - }, { note: "patch: remove: non-existent key", partitions: []string{}, @@ -958,7 +1121,7 @@ func TestDataPartitioningWriteNotFoundErrors(t *testing.T) { } ctx := context.Background() - s, err := New(ctx, Options{Dir: dir, Partitions: partitions}) + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: partitions}) if err != nil { t.Fatal(err) @@ -994,7 +1157,7 @@ func TestDataPartitioningWriteNotFoundErrors(t *testing.T) { func TestDataPartitioningWriteInvalidPatchError(t *testing.T) { test.WithTempFS(map[string]string{}, func(dir string) { ctx := context.Background() - s, err := New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo"), }}) if err != nil { @@ -1042,3 +1205,274 @@ func executeTestWrite(ctx context.Context, t *testing.T, s storage.Store, x test t.Fatal(err) } } + +func TestDiskTriggers(t *testing.T) { + test.WithTempFS(map[string]string{}, func(dir string) { + ctx := context.Background() + store, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ + storage.MustParsePath("/foo"), + }}) + if err != nil { + t.Fatal(err) + } + defer store.Close(ctx) + writeTxn := storage.NewTransactionOrDie(ctx, store, storage.WriteParams) + readTxn := storage.NewTransactionOrDie(ctx, store) + + _, err = store.Register(ctx, readTxn, storage.TriggerConfig{ + OnCommit: func(context.Context, storage.Transaction, storage.TriggerEvent) {}, + }) + + if err == nil || !storage.IsInvalidTransaction(err) { + t.Fatalf("Expected transaction error: %v", err) + } + + store.Abort(ctx, readTxn) + + var event storage.TriggerEvent + modifiedPath := storage.MustParsePath("/a") + expectedValue := "hello" + + _, err = store.Register(ctx, writeTxn, storage.TriggerConfig{ + OnCommit: func(ctx context.Context, txn storage.Transaction, evt storage.TriggerEvent) { + result, err := store.Read(ctx, txn, modifiedPath) + if err != nil || !reflect.DeepEqual(result, expectedValue) { + t.Fatalf("Expected result to be hello for trigger read but got: %v (err: %v)", result, err) + } + event = evt + }, + }) + if err != nil { + t.Fatalf("Failed to register callback: %v", err) + } + + if err := store.Write(ctx, writeTxn, storage.ReplaceOp, modifiedPath, expectedValue); err != nil { + t.Fatalf("Unexpected write error: %v", err) + } + + id := "test" + data := []byte("package abc") + if err := store.UpsertPolicy(ctx, writeTxn, id, data); err != nil { + t.Fatalf("Unexpected upsert error: %v", err) + } + + if err := store.Commit(ctx, writeTxn); err != nil { + t.Fatalf("Unexpected commit error: %v", err) + } + + if event.IsZero() || !event.PolicyChanged() || !event.DataChanged() { + t.Fatalf("Expected policy and data change but got: %v", event) + } + + expData := storage.DataEvent{Path: modifiedPath, Data: expectedValue, Removed: false} + if d := event.Data[0]; !reflect.DeepEqual(expData, d) { + t.Fatalf("Expected data event %v, got %v", expData, d) + } + + expPolicy := storage.PolicyEvent{ID: id, Data: data, Removed: false} + if p := event.Policy[0]; !reflect.DeepEqual(expPolicy, p) { + t.Fatalf("Expected policy event %v, got %v", expPolicy, p) + } + }) +} + +func TestDiskDiagnostics(t *testing.T) { + ctx := context.Background() + + t.Run("no partitions", func(t *testing.T) { + test.WithTempFS(nil, func(dir string) { + buf := bytes.Buffer{} + logger := logging.New() + logger.SetOutput(&buf) + logger.SetLevel(logging.Debug) + store, err := New(ctx, logger, nil, Options{Dir: dir}) + if err != nil { + t.Fatal(err) + } + // store something, won't show up in the logs yet (they're calculated on startup only) + if err := storage.WriteOne(ctx, store, storage.AddOp, storage.MustParsePath("/foo"), util.MustUnmarshalJSON([]byte(`{"baz": 1000}`))); err != nil { + t.Fatal(err) + } + if err := storage.WriteOne(ctx, store, storage.AddOp, storage.MustParsePath("/bar"), util.MustUnmarshalJSON([]byte(`{"quz": 2000}`))); err != nil { + t.Fatal(err) + } + if err := store.Close(ctx); err != nil { + t.Fatal(err) + } + + expected := []string{ + `level=error msg="no partitions configured"`, + `level=debug msg="partition /: key count: 0 (estimated size 0 bytes)"`, + } + for _, exp := range expected { + if !strings.Contains(buf.String(), exp) { + t.Errorf("expected string %q not found in logs", exp) + } + } + if t.Failed() { + t.Log("log oputput: ", buf.String()) + } + + // re-open + buf = bytes.Buffer{} + logger = logging.New() + logger.SetOutput(&buf) + logger.SetLevel(logging.Debug) + store, err = New(ctx, logger, nil, Options{Dir: dir}) + if err != nil { + t.Fatal(err) + } + if err := store.Close(ctx); err != nil { + t.Fatal(err) + } + + expected = []string{ + `level=debug msg="partition /: key count: 2 (estimated size 50 bytes)"`, + } + for _, exp := range expected { + if !strings.Contains(buf.String(), exp) { + t.Errorf("expected string %q not found in logs", exp) + } + } + if t.Failed() { + t.Log("log oputput: ", buf.String()) + } + }) + }) + + t.Run("two partitions", func(t *testing.T) { + test.WithTempFS(nil, func(dir string) { + opts := Options{ + Dir: dir, + Partitions: []storage.Path{ + storage.MustParsePath("/foo"), + storage.MustParsePath("/bar"), + }} + buf := bytes.Buffer{} + logger := logging.New() + logger.SetOutput(&buf) + logger.SetLevel(logging.Debug) + store, err := New(ctx, logger, nil, opts) + if err != nil { + t.Fatal(err) + } + + // store something, won't show up in the logs yet (they're calculated on startup only) + if err := storage.WriteOne(ctx, store, storage.AddOp, storage.MustParsePath("/foo"), util.MustUnmarshalJSON([]byte(`{"baz": 1000}`))); err != nil { + t.Fatal(err) + } + + if err := store.Close(ctx); err != nil { + t.Fatal(err) + } + + expected := []string{ + `level=debug msg="partition /bar: key count: 0 (estimated size 0 bytes)"`, + `level=debug msg="partition /foo: key count: 0 (estimated size 0 bytes)"`, + } + for _, exp := range expected { + if !strings.Contains(buf.String(), exp) { + t.Errorf("expected string %q not found in logs", exp) + } + } + if t.Failed() { + t.Log("log oputput: ", buf.String()) + } + + // re-open + buf = bytes.Buffer{} + logger = logging.New() + logger.SetOutput(&buf) + logger.SetLevel(logging.Debug) + store, err = New(ctx, logger, nil, opts) + if err != nil { + t.Fatal(err) + } + if err := store.Close(ctx); err != nil { + t.Fatal(err) + } + + expected = []string{ + `level=debug msg="partition /bar: key count: 0 (estimated size 0 bytes)"`, + `level=debug msg="partition /foo: key count: 1 (estimated size 21 bytes)"`, + } + for _, exp := range expected { + if !strings.Contains(buf.String(), exp) { + t.Errorf("expected string %q not found in logs", exp) + } + } + if t.Failed() { + t.Log("log oputput: ", buf.String()) + } + }) + }) + + t.Run("patterned partitions", func(t *testing.T) { + test.WithTempFS(nil, func(dir string) { + opts := Options{Dir: dir, + Partitions: []storage.Path{ + storage.MustParsePath("/foo/*/bar"), + storage.MustParsePath("/bar"), + }} + buf := bytes.Buffer{} + logger := logging.New() + logger.SetOutput(&buf) + logger.SetLevel(logging.Debug) + store, err := New(ctx, logger, nil, opts) + if err != nil { + t.Fatal(err) + } + + // store something, won't show up in the logs yet (they're calculated on startup only) + if err := storage.WriteOne(ctx, store, storage.AddOp, storage.MustParsePath("/foo/x/bar"), util.MustUnmarshalJSON([]byte(`{"baz": 1000}`))); err != nil { + t.Fatal(err) + } + if err := storage.WriteOne(ctx, store, storage.AddOp, storage.MustParsePath("/bar"), util.MustUnmarshalJSON([]byte(`{"quz": 1000}`))); err != nil { + t.Fatal(err) + } + + if err := store.Close(ctx); err != nil { + t.Fatal(err) + } + + expected := []string{ + `level=debug msg="partition /bar: key count: 0 (estimated size 0 bytes)"`, + `level=debug msg="partition pattern /foo/*/bar: key count: 0 (estimated size 0 bytes)"`, + } + for _, exp := range expected { + if !strings.Contains(buf.String(), exp) { + t.Errorf("expected string %q not found in logs", exp) + } + } + if t.Failed() { + t.Log("log oputput: ", buf.String()) + } + + // re-open + buf = bytes.Buffer{} + logger = logging.New() + logger.SetOutput(&buf) + logger.SetLevel(logging.Debug) + store, err = New(ctx, logger, nil, opts) + if err != nil { + t.Fatal(err) + } + if err := store.Close(ctx); err != nil { + t.Fatal(err) + } + + expected = []string{ + `level=debug msg="partition /bar: key count: 1 (estimated size 21 bytes)"`, + `level=debug msg="partition /foo/x/bar (pattern /foo/*/bar): key count: 1 (estimated size 27 bytes)"`, + } + for _, exp := range expected { + if !strings.Contains(buf.String(), exp) { + t.Errorf("expected string %q not found in logs", exp) + } + } + if t.Failed() { + t.Log("log oputput: ", buf.String()) + } + }) + }) +} diff --git a/storage/disk/example_test.go b/storage/disk/example_test.go index 88fefbab13..cd3871ff8d 100644 --- a/storage/disk/example_test.go +++ b/storage/disk/example_test.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "os" + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/storage" "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/util" @@ -33,7 +34,7 @@ func Example_store() { defer os.RemoveAll(dir) // Create a new disk-based store. - store, err := disk.New(ctx, disk.Options{ + store, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/authz/tenants"), @@ -63,7 +64,7 @@ func Example_store() { check(err) // Re-create the disk-based store using the same options. - store2, err := disk.New(ctx, disk.Options{ + store2, err := disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/authz/tenants"), diff --git a/storage/disk/metrics.go b/storage/disk/metrics.go new file mode 100644 index 0000000000..f7e53eba57 --- /dev/null +++ b/storage/disk/metrics.go @@ -0,0 +1,51 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package disk + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + // storage read transactions never delete or write + keysReadPerStoreRead = newHist("keys_read_per_store_read_txn", "How many database reads had to occur for a storage read transaction") + bytesReadPerStoreRead = newHist("key_bytes_read_per_store_read_txn", "How many bytes of data were read for a storage read transaction") + + keysReadPerStoreWrite = newHist("keys_read_per_store_write_txn", "How many database reads had to occur for a storage write transaction") + keysWrittenPerStoreWrite = newHist("keys_written_per_store_write_txn", "How many database writes had to occur for a storage write transaction") + keysDeletedPerStoreWrite = newHist("keys_deleted_per_store_write_txn", "How many database writes had to occur for a storage write transaction") + bytesReadPerStoreWrite = newHist("key_bytes_read_per_store_write_txn", "How many bytes of data were read for a storage write transaction") +) + +func initPrometheus(reg prometheus.Registerer) error { + for _, hist := range []prometheus.Histogram{ + keysReadPerStoreRead, + bytesReadPerStoreRead, + keysReadPerStoreWrite, + keysWrittenPerStoreWrite, + keysDeletedPerStoreWrite, + bytesReadPerStoreWrite, + } { + if err := reg.Register(hist); err != nil { + return err + } + } + return nil +} + +func newHist(name, desc string) prometheus.Histogram { + return prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: name, + Help: desc, + Buckets: prometheus.LinearBuckets(1, 1, 10), // TODO different buckets? exp? + }) +} + +func forwardMetric(m map[string]interface{}, counter string, hist prometheus.Histogram) { + key := "counter_" + counter + if s, ok := m[key]; ok { + hist.Observe(float64(s.(uint64))) + } +} diff --git a/storage/disk/partition.go b/storage/disk/partition.go index adee4fe153..a10fc96405 100644 --- a/storage/disk/partition.go +++ b/storage/disk/partition.go @@ -29,7 +29,12 @@ func newPartitionTrie() *partitionTrie { func (p *partitionTrie) Find(path storage.Path) (int, *partitionTrie) { node := p for i, x := range path { - next, ok := node.partitions[x] + next, ok := node.partitions[pathWildcard] + if ok { + node = next + continue + } + next, ok = node.partitions[x] if !ok { return i + 1, nil } diff --git a/storage/disk/partition_test.go b/storage/disk/partition_test.go index 506b40533b..29fc830ca8 100644 --- a/storage/disk/partition_test.go +++ b/storage/disk/partition_test.go @@ -18,11 +18,13 @@ func TestPartitionTrie(t *testing.T) { storage.MustParsePath("/foo/bar"), storage.MustParsePath("/foo/baz/qux"), storage.MustParsePath("/corge"), + storage.MustParsePath("/tenants/*/bindings"), // wildcard in the middle + storage.MustParsePath("/users/*"), // wildcard at the end }) // Assert on counts... - if len(root.partitions) != 2 { - t.Fatal("expected root to contain two partitions") + if exp, act := 4, len(root.partitions); exp != act { + t.Fatalf("expected root to contain %d partitions, got %d", exp, act) } if len(root.partitions["foo"].partitions) != 2 { @@ -82,6 +84,22 @@ func TestPartitionTrie(t *testing.T) { path: "/deadbeef", wantIdx: 1, wantPtr: nil, + }, { + path: "/tenants/deadbeef/bindings/user01", + wantIdx: 4, + wantPtr: nil, + }, { + path: "/tenants/deadbeef/bindings", + wantIdx: 3, + wantPtr: root.partitions["tenants"].partitions["*"].partitions["bindings"], + }, { + path: "/tenants/deadbeef/foo", + wantIdx: 3, + wantPtr: nil, + }, { + path: "/users/deadbeef", + wantIdx: 2, + wantPtr: root.partitions["users"].partitions["*"], }, } diff --git a/storage/disk/paths.go b/storage/disk/paths.go index c052421e2b..04864087b5 100644 --- a/storage/disk/paths.go +++ b/storage/disk/paths.go @@ -11,6 +11,8 @@ import ( "github.com/open-policy-agent/opa/storage" ) +const pathWildcard = "*" + type pathMapper struct { dataPrefix string dataPrefixNoTrailingSlash string @@ -38,7 +40,7 @@ func (pm *pathMapper) PolicyID2Key(id string) []byte { } func (pm *pathMapper) DataKey2Path(key []byte) (storage.Path, error) { - p, ok := storage.ParsePath(string(key)) + p, ok := storage.ParsePathEscaped(string(key)) if !ok { return nil, &storage.Error{Code: storage.InternalErr, Message: fmt.Sprintf("corrupt key: %s", key)} } @@ -66,7 +68,7 @@ func (ps pathSet) IsDisjoint() bool { for i := range ps { for j := range ps { if i != j { - if ps[i].HasPrefix(ps[j]) || ps[j].HasPrefix(ps[i]) { + if hasPrefixWithWildcard(ps[i], ps[j]) { return false } } @@ -75,6 +77,23 @@ func (ps pathSet) IsDisjoint() bool { return true } +// hasPrefixWithWildcard returns true if p starts with other; respecting +// wildcards +func hasPrefixWithWildcard(p, other storage.Path) bool { + if len(other) > len(p) { + return false + } + for i := range other { + if p[i] == pathWildcard || other[i] == pathWildcard { + continue + } + if p[i] != other[i] { + return false + } + } + return true +} + func (ps pathSet) Diff(other pathSet) pathSet { diff := pathSet{} for _, x := range ps { diff --git a/storage/disk/paths_test.go b/storage/disk/paths_test.go new file mode 100644 index 0000000000..92993a2d89 --- /dev/null +++ b/storage/disk/paths_test.go @@ -0,0 +1,59 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package disk + +import ( + "testing" + + "github.com/open-policy-agent/opa/storage" +) + +func TestIsDisjoint(t *testing.T) { + paths := func(ps ...string) pathSet { + ret := make([]storage.Path, len(ps)) + for i := range ps { + ret[i] = storage.MustParsePath(ps[i]) + } + return ret + } + + for _, tc := range []struct { + note string + ps pathSet + overlapped bool + }{ + { + note: "simple disjoint", + ps: paths("/foo", "/bar", "/baz"), + }, + { + note: "simple overlapping", + ps: paths("/foo", "/foo/bar"), + overlapped: true, + }, + { + note: "three overlapping", + ps: paths("/fox", "/foo/bar", "/foo"), + overlapped: true, + }, + { + note: "wildcard overlapping, last", + ps: paths("/foo", "/foo/*"), + overlapped: true, + }, + { + note: "wildcard overlapping, middle", + ps: paths("/foo/bar/baz", "/foo/*/baz"), + overlapped: true, + }, + } { + t.Run(tc.note, func(t *testing.T) { + act := tc.ps.IsDisjoint() + if !tc.overlapped != act { + t.Errorf("path set: %v, disjoint == %v, expected %v", tc.ps, act, !tc.overlapped) + } + }) + } +} diff --git a/storage/disk/txn.go b/storage/disk/txn.go index cd62379841..f51f74763b 100644 --- a/storage/disk/txn.go +++ b/storage/disk/txn.go @@ -9,27 +9,52 @@ import ( "context" "encoding/json" "fmt" + "strconv" badger "github.com/dgraph-io/badger/v3" + "github.com/open-policy-agent/opa/metrics" "github.com/open-policy-agent/opa/storage" "github.com/open-policy-agent/opa/storage/internal/errors" "github.com/open-policy-agent/opa/storage/internal/ptr" "github.com/open-policy-agent/opa/util" ) +const ( + readValueBytesCounter = "disk_read_bytes" + readKeysCounter = "disk_read_keys" + writtenKeysCounter = "disk_written_keys" + deletedKeysCounter = "disk_deleted_keys" + + commitTimer = "disk_commit" + readTimer = "disk_read" + writeTimer = "disk_write" +) + type transaction struct { - underlying *badger.Txn // handle for the underlying badger transaction - partitions *partitionTrie // index for partitioning structure in underlying store - pm *pathMapper // used for mapping between logical storage paths and actual storage keys - db *Store // handle for the database this transaction was created on - xid uint64 // unique id for this transaction - stale bool // bit to indicate if the transaction was already aborted/committed - write bool // bit to indicate if the transaction may perform writes - context *storage.Context // context supplied by the caller to be included in triggers + underlying *badger.Txn // handle for the underlying badger transaction + partitions *partitionTrie // index for partitioning structure in underlying store + pm *pathMapper // used for mapping between logical storage paths and actual storage keys + db *Store // handle for the database this transaction was created on + xid uint64 // unique id for this transaction + stale bool // bit to indicate if the transaction was already aborted/committed + write bool // bit to indicate if the transaction may perform writes + event storage.TriggerEvent // constructed as we go, supplied by the caller to be included in triggers + metrics metrics.Metrics // per-transaction metrics } func newTransaction(xid uint64, write bool, underlying *badger.Txn, context *storage.Context, pm *pathMapper, trie *partitionTrie, db *Store) *transaction { + + // Even if the caller is not interested, these will contribute + // to the prometheus metrics on commit. + var m metrics.Metrics + if context != nil { + m = context.Metrics() + } + if m == nil { + m = metrics.New() + } + return &transaction{ underlying: underlying, partitions: trie, @@ -38,7 +63,10 @@ func newTransaction(xid uint64, write bool, underlying *badger.Txn, context *sto xid: xid, stale: false, write: write, - context: context, + event: storage.TriggerEvent{ + Context: context, + }, + metrics: m, } } @@ -46,16 +74,31 @@ func (txn *transaction) ID() uint64 { return txn.xid } +// Commit will commit the underlying transaction, and forward the per-transaction +// metrics into prometheus metrics. +// NOTE(sr): aborted transactions are not measured func (txn *transaction) Commit(context.Context) (storage.TriggerEvent, error) { - // TODO(tsandall): This implementation does not provide any data or policy - // events because they are of minimal value given that the transaction - // cannot be used for reads once the commit finishes. This differs from the - // in-memory store. - // - // We should figure out how to remove the code in the plugin manager that - // performs reads on committed transactions. txn.stale = true - return storage.TriggerEvent{Context: txn.context}, wrapError(txn.underlying.Commit()) + txn.metrics.Timer(commitTimer).Start() + err := wrapError(txn.underlying.Commit()) + txn.metrics.Timer(commitTimer).Stop() + + if err != nil { + return txn.event, err + } + + m := txn.metrics.All() + if txn.write { + forwardMetric(m, readKeysCounter, keysReadPerStoreWrite) + forwardMetric(m, readKeysCounter, keysReadPerStoreWrite) + forwardMetric(m, writtenKeysCounter, keysWrittenPerStoreWrite) + forwardMetric(m, deletedKeysCounter, keysDeletedPerStoreWrite) + forwardMetric(m, readValueBytesCounter, bytesReadPerStoreWrite) + } else { + forwardMetric(m, readKeysCounter, keysReadPerStoreRead) + forwardMetric(m, readValueBytesCounter, bytesReadPerStoreRead) + } + return txn.event, nil } func (txn *transaction) Abort(context.Context) { @@ -63,12 +106,13 @@ func (txn *transaction) Abort(context.Context) { txn.underlying.Discard() } -func (txn *transaction) Read(_ context.Context, path storage.Path) (interface{}, error) { +func (txn *transaction) Read(ctx context.Context, path storage.Path) (interface{}, error) { + txn.metrics.Timer(readTimer).Start() + defer txn.metrics.Timer(readTimer).Stop() i, node := txn.partitions.Find(path) if node == nil { - key, err := txn.pm.DataPath2Key(path[:i]) if err != nil { return nil, err @@ -87,10 +131,10 @@ func (txn *transaction) Read(_ context.Context, path storage.Path) (interface{}, return nil, err } - return txn.readMultiple(i, key) + return txn.readMultiple(ctx, i, key) } -func (txn *transaction) readMultiple(offset int, prefix []byte) (interface{}, error) { +func (txn *transaction) readMultiple(ctx context.Context, offset int, prefix []byte) (interface{}, error) { result := map[string]interface{}{} @@ -98,8 +142,14 @@ func (txn *transaction) readMultiple(offset int, prefix []byte) (interface{}, er defer it.Close() var keybuf, valbuf []byte + var count uint64 for it.Rewind(); it.Valid(); it.Next() { + if ctx.Err() != nil { + return nil, ctx.Err() + } + + count++ keybuf = it.Item().KeyCopy(keybuf) path, err := txn.pm.DataKey2Path(keybuf) @@ -111,6 +161,7 @@ func (txn *transaction) readMultiple(offset int, prefix []byte) (interface{}, er if err != nil { return nil, wrapError(err) } + txn.metrics.Counter(readValueBytesCounter).Add(uint64(len(valbuf))) var value interface{} if err := deserialize(valbuf, &value); err != nil { @@ -135,6 +186,8 @@ func (txn *transaction) readMultiple(offset int, prefix []byte) (interface{}, er node[path[len(path)-1]] = value } + txn.metrics.Counter(readKeysCounter).Add(count) + if len(result) == 0 { return nil, errNotFound } @@ -143,6 +196,7 @@ func (txn *transaction) readMultiple(offset int, prefix []byte) (interface{}, er } func (txn *transaction) readOne(key []byte) (interface{}, error) { + txn.metrics.Counter(readKeysCounter).Add(1) item, err := txn.underlying.Get(key) if err != nil { @@ -155,6 +209,7 @@ func (txn *transaction) readOne(key []byte) (interface{}, error) { var val interface{} err = item.Value(func(bs []byte) error { + txn.metrics.Counter(readValueBytesCounter).Add(uint64(len(bs))) return deserialize(bs, &val) }) @@ -164,66 +219,49 @@ func (txn *transaction) readOne(key []byte) (interface{}, error) { type update struct { key []byte value []byte + data interface{} delete bool } -// errTxnTooBigErrorHandler checks if the passed error was caused by a transaction -// that was _too big_. If so, it attempts to commit the transaction and opens a new one. -// See https://dgraph.io/docs/badger/get-started/#read-write-transactions -func errTxnTooBigErrorHandler(txn *transaction, err error) error { - errSetCommit := txn.underlying.Commit() - if errSetCommit != nil { - return wrapError(errSetCommit) - } - txn.underlying = txn.db.db.NewTransaction(true) - return nil -} - -// !!!  infinite recursion only if infinite txn too big error occurred -func deleteWithErrTxnTooBigErrorHandling(txn *transaction, u *update) error { - err := txn.underlying.Delete(u.key) - if err == badger.ErrTxnTooBig { - if txnErr := errTxnTooBigErrorHandler(txn, err); txnErr != nil { - return txnErr - } - return deleteWithErrTxnTooBigErrorHandling(txn, u) - } - return wrapError(err) -} - -// !!! infinite recursion only if infinite txn too big error occurred -func setWithErrTxnTooBigErrorHandling(txn *transaction, u *update) error { - err := txn.underlying.Set(u.key, u.value) - if err == badger.ErrTxnTooBig { - if txnErr := errTxnTooBigErrorHandler(txn, err); txnErr != nil { - return txnErr - } - return setWithErrTxnTooBigErrorHandling(txn, u) - } - return wrapError(err) -} - func (txn *transaction) Write(_ context.Context, op storage.PatchOp, path storage.Path, value interface{}) error { + txn.metrics.Timer(writeTimer).Start() + defer txn.metrics.Timer(writeTimer).Stop() + updates, err := txn.partitionWrite(op, path, value) if err != nil { return err } for _, u := range updates { if u.delete { - if err := deleteWithErrTxnTooBigErrorHandling(txn, &u); err != nil { + if err := txn.underlying.Delete(u.key); err != nil { return err } + txn.metrics.Counter(deletedKeysCounter).Add(1) } else { - if err := setWithErrTxnTooBigErrorHandling(txn, &u); err != nil { + if err := txn.underlying.Set(u.key, u.value); err != nil { return err } + txn.metrics.Counter(writtenKeysCounter).Add(1) } + + txn.event.Data = append(txn.event.Data, storage.DataEvent{ + Path: path, // ? + Data: u.data, // nil if delete == true + Removed: u.delete, + }) } return nil } func (txn *transaction) partitionWrite(op storage.PatchOp, path storage.Path, value interface{}) ([]update, error) { + if op == storage.RemoveOp && len(path) == 0 { + return nil, &storage.Error{ + Code: storage.InvalidPatchErr, + Message: "root cannot be removed", + } + } + i, node := txn.partitions.Find(path) if node == nil { @@ -241,11 +279,11 @@ func (txn *transaction) partitionWrite(op storage.PatchOp, path storage.Path, va } curr, err := txn.readOne(key) - if err != nil { + if err != nil && err != errNotFound { return nil, err } - modified, err := patch(curr, op, path[i:], value) + modified, err := patch(curr, op, path, i, value) if err != nil { return nil, err } @@ -254,8 +292,7 @@ func (txn *transaction) partitionWrite(op storage.PatchOp, path storage.Path, va if err != nil { return nil, err } - - return []update{{key: key, value: bs}}, nil + return []update{{key: key, value: bs, data: modified}}, nil } key, err := txn.pm.DataPrefix2Key(path) @@ -270,6 +307,7 @@ func (txn *transaction) partitionWrite(op storage.PatchOp, path storage.Path, va for it.Rewind(); it.Valid(); it.Next() { updates = append(updates, update{key: it.Item().KeyCopy(nil), delete: true}) + txn.metrics.Counter(readKeysCounter).Add(1) } if op == storage.RemoveOp { @@ -291,23 +329,27 @@ func (txn *transaction) partitionWriteMultiple(node *partitionTrie, path storage for k, v := range obj { child := append(path, k) next, ok := node.partitions[k] - if !ok { - key, err := txn.pm.DataPath2Key(child) - if err != nil { - return nil, err - } - bs, err := serialize(v) - if err != nil { - return nil, err - } - result = append(result, update{key: key, value: bs}) - } else { + if !ok { // try wildcard + next, ok = node.partitions[pathWildcard] + } + if ok { var err error result, err = txn.partitionWriteMultiple(next, child, v, result) if err != nil { return nil, err } + continue + } + + key, err := txn.pm.DataPath2Key(child) + if err != nil { + return nil, err + } + bs, err := serialize(v) + if err != nil { + return nil, err } + result = append(result, update{key: key, value: bs, data: v}) } return result, nil @@ -328,10 +370,10 @@ func (txn *transaction) partitionWriteOne(op storage.PatchOp, path storage.Path, return nil, err } - return []update{{key: key, value: val}}, nil + return []update{{key: key, value: val, data: value}}, nil } -func (txn *transaction) ListPolicies(context.Context) ([]string, error) { +func (txn *transaction) ListPolicies(ctx context.Context) ([]string, error) { var result []string @@ -344,6 +386,10 @@ func (txn *transaction) ListPolicies(context.Context) ([]string, error) { var key []byte for it.Rewind(); it.Valid(); it.Next() { + if ctx.Err() != nil { + return nil, ctx.Err() + } + txn.metrics.Counter(readKeysCounter).Add(1) item := it.Item() key = item.KeyCopy(key) result = append(result, txn.pm.PolicyKey2ID(key)) @@ -353,20 +399,41 @@ func (txn *transaction) ListPolicies(context.Context) ([]string, error) { } func (txn *transaction) GetPolicy(_ context.Context, id string) ([]byte, error) { + txn.metrics.Counter(readKeysCounter).Add(1) item, err := txn.underlying.Get(txn.pm.PolicyID2Key(id)) if err != nil { + if err == badger.ErrKeyNotFound { + return nil, errors.NewNotFoundErrorf("policy id %q", id) + } return nil, err } bs, err := item.ValueCopy(nil) + txn.metrics.Counter(readValueBytesCounter).Add(uint64(len(bs))) return bs, wrapError(err) } func (txn *transaction) UpsertPolicy(_ context.Context, id string, bs []byte) error { - return wrapError(txn.underlying.Set(txn.pm.PolicyID2Key(id), bs)) + if err := txn.underlying.Set(txn.pm.PolicyID2Key(id), bs); err != nil { + return wrapError(err) + } + txn.metrics.Counter(writtenKeysCounter).Add(1) + txn.event.Policy = append(txn.event.Policy, storage.PolicyEvent{ + ID: id, + Data: bs, + }) + return nil } func (txn *transaction) DeletePolicy(_ context.Context, id string) error { - return wrapError(txn.underlying.Delete(txn.pm.PolicyID2Key(id))) + if err := txn.underlying.Delete(txn.pm.PolicyID2Key(id)); err != nil { + return wrapError(err) + } + txn.metrics.Counter(deletedKeysCounter).Add(1) + txn.event.Policy = append(txn.event.Policy, storage.PolicyEvent{ + ID: id, + Removed: true, + }) + return nil } func serialize(value interface{}) ([]byte, error) { @@ -379,79 +446,99 @@ func deserialize(bs []byte, result interface{}) error { return wrapError(d.Decode(&result)) } -func patch(data interface{}, op storage.PatchOp, path storage.Path, value interface{}) (interface{}, error) { - - if len(path) == 0 { +func patch(data interface{}, op storage.PatchOp, path storage.Path, idx int, value interface{}) (interface{}, error) { + if idx == len(path) { panic("unreachable") } // Base case: mutate the data value in-place. - if len(path) == 1 { + if len(path) == idx+1 { // last element switch x := data.(type) { case map[string]interface{}: + key := path[len(path)-1] switch op { case storage.RemoveOp: - key := path[len(path)-1] if _, ok := x[key]; !ok { return nil, errors.NewNotFoundError(path) } delete(x, key) return x, nil case storage.ReplaceOp: - key := path[len(path)-1] if _, ok := x[key]; !ok { return nil, errors.NewNotFoundError(path) } x[key] = value return x, nil case storage.AddOp: - key := path[len(path)-1] x[key] = value return x, nil } case []interface{}: - if path[0] == "-" { - return append(x, value), nil - } - idx, err := ptr.ValidateArrayIndex(x, path[0], path) - if err != nil { - return nil, err + switch op { + case storage.AddOp: + if path[idx] == "-" || path[idx] == strconv.Itoa(len(x)) { + return append(x, value), nil + } + i, err := ptr.ValidateArrayIndexForWrite(x, path[idx], idx, path) + if err != nil { + return nil, err + } + // insert at i + return append(x[:i], append([]interface{}{value}, x[i:]...)...), nil + case storage.ReplaceOp: + i, err := ptr.ValidateArrayIndexForWrite(x, path[idx], idx, path) + if err != nil { + return nil, err + } + x[i] = value + return x, nil + case storage.RemoveOp: + i, err := ptr.ValidateArrayIndexForWrite(x, path[idx], idx, path) + if err != nil { + return nil, err + } + return append(x[:i], x[i+1:]...), nil // i is skipped + default: + panic("unreachable") } - x[idx] = value - return x, nil + case nil: // data wasn't set before + return map[string]interface{}{path[idx]: value}, nil default: return nil, errors.NewNotFoundError(path) } } // Recurse on the value located at the next part of the path. - key := path[0] + key := path[idx] switch x := data.(type) { case map[string]interface{}: - child, ok := x[key] - if !ok { - return nil, errors.NewNotFoundError(path) - } - modified, err := patch(child, op, path[1:], value) + modified, err := patch(x[key], op, path, idx+1, value) if err != nil { return nil, err } x[key] = modified return x, nil case []interface{}: - idx, err := ptr.ValidateArrayIndex(x, path[0], path) + i, err := ptr.ValidateArrayIndexForWrite(x, path[idx], idx+1, path) if err != nil { return nil, err } - modified, err := patch(x[idx], op, path[1:], value) + modified, err := patch(x[i], op, path, idx+1, value) if err != nil { return nil, err } - x[idx] = modified + x[i] = modified return x, nil + case nil: // data isn't there yet + y := make(map[string]interface{}, 1) + modified, err := patch(nil, op, path, idx+1, value) + if err != nil { + return nil, err + } + y[key] = modified + return y, nil default: return nil, errors.NewNotFoundError(path) } - } diff --git a/storage/disk/txn_test.go b/storage/disk/txn_test.go index fa319f3023..2f2d4906f0 100644 --- a/storage/disk/txn_test.go +++ b/storage/disk/txn_test.go @@ -6,13 +6,14 @@ package disk import ( "context" + "errors" + "fmt" "math/rand" - "strconv" - "strings" "testing" + "github.com/dgraph-io/badger/v3" + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/storage" - "github.com/open-policy-agent/opa/util" "github.com/open-policy-agent/opa/util/test" ) @@ -26,57 +27,54 @@ func randomString(n int) string { return string(s) } -func fixture(nbKeys int) interface{} { - i := 1 - var keyValues = []string{} - for i <= nbKeys { - keyValues = append(keyValues, "\""+strconv.Itoa(i)+randomString(4)+"\": \""+randomString(3)+"\"") - i++ +func fixture(n int) map[string]interface{} { + foo := map[string]string{} + for i := 0; i < n; i++ { + foo[fmt.Sprintf(`"%d%s"`, i, randomString(4))] = randomString(3) } - jsonBytes := []byte(`{"foo":{` + strings.Join(keyValues, ",") + `}}`) - return util.MustUnmarshalJSON(jsonBytes) + return map[string]interface{}{"foo": foo} } -func TestSetTxnIsTooBigToFitIntoOneRequestWhenUseDiskStore(t *testing.T) { - test.WithTempFS(map[string]string{}, func(dir string) { +func TestSetTxnIsTooBigToFitIntoOneRequestWhenUseDiskStoreReturnsError(t *testing.T) { + test.WithTempFS(nil, func(dir string) { ctx := context.Background() - s, err := New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo"), }}) if err != nil { t.Fatal(err) } - nbKeys := 140_000 // !!! 135_000 it's ok, but 140_000 not + nbKeys := 140_000 // 135_000 is ok, but 140_000 not jsonFixture := fixture(nbKeys) - errTxn := storage.Txn(ctx, s, storage.WriteParams, func(txn storage.Transaction) error { - - errTxnWrite := s.Write(ctx, txn, storage.AddOp, storage.MustParsePath("/"), jsonFixture) - if errTxnWrite != nil { - t.Fatal(errTxnWrite) + err = storage.Txn(ctx, s, storage.WriteParams, func(txn storage.Transaction) error { + err := s.Write(ctx, txn, storage.AddOp, storage.MustParsePath("/"), jsonFixture) + if !errors.Is(err, badger.ErrTxnTooBig) { + t.Errorf("expected %v, got %v", badger.ErrTxnTooBig, err) } - return nil + return err }) - if errTxn != nil { - t.Fatal(errTxn) + if !errors.Is(err, badger.ErrTxnTooBig) { + t.Errorf("expected %v, got %v", badger.ErrTxnTooBig, err) } - result, errRead := storage.ReadOne(ctx, s, storage.MustParsePath("/foo")) - if errRead != nil { - t.Fatal(errRead) + _, err = storage.ReadOne(ctx, s, storage.MustParsePath("/foo")) + var notFound *storage.Error + ok := errors.As(err, ¬Found) + if !ok { + t.Errorf("expected %T, got %v", notFound, err) } - actualNbKeys := len(result.(map[string]interface{})) - if nbKeys != actualNbKeys { - t.Fatalf("Expected %d keys, read %d", nbKeys, actualNbKeys) + if exp, act := storage.NotFoundErr, notFound.Code; exp != act { + t.Errorf("expected code %v, got %v", exp, act) } }) } func TestDeleteTxnIsTooBigToFitIntoOneRequestWhenUseDiskStore(t *testing.T) { - test.WithTempFS(map[string]string{}, func(dir string) { + test.WithTempFS(nil, func(dir string) { ctx := context.Background() - s, err := New(ctx, Options{Dir: dir, Partitions: []storage.Path{ + s, err := New(ctx, logging.NewNoOpLogger(), nil, Options{Dir: dir, Partitions: []storage.Path{ storage.MustParsePath("/foo"), }}) if err != nil { @@ -84,36 +82,45 @@ func TestDeleteTxnIsTooBigToFitIntoOneRequestWhenUseDiskStore(t *testing.T) { } nbKeys := 200_000 jsonFixture := fixture(nbKeys) - errTxn := storage.Txn(ctx, s, storage.WriteParams, func(txn storage.Transaction) error { + foo := jsonFixture["foo"].(map[string]string) - errTxnWrite := s.Write(ctx, txn, storage.AddOp, storage.MustParsePath("/"), jsonFixture) - if errTxnWrite != nil { - t.Fatal(errTxnWrite) + // Write data in increments so we don't step over the too-large-txn limit + for k, v := range foo { + err := storage.WriteOne(ctx, s, storage.AddOp, storage.MustParsePath("/foo/"+k), v) + if err != nil { + t.Fatal(err) } - return nil - }) - if errTxn != nil { - t.Fatal(errTxn) } - errTxnD := storage.Txn(ctx, s, storage.WriteParams, func(txn storage.Transaction) error { - errTxnWrite := s.Write(ctx, txn, storage.RemoveOp, storage.MustParsePath("/foo"), jsonFixture) - if errTxnWrite != nil { - t.Fatal(errTxnWrite) + // check expected state + res, err := storage.ReadOne(ctx, s, storage.MustParsePath("/foo")) + if err != nil { + t.Fatal(err) + } + if exp, act := nbKeys, len(res.(map[string]interface{})); exp != act { + t.Fatalf("expected %d keys, read %d", exp, act) + } + + err = storage.Txn(ctx, s, storage.WriteParams, func(txn storage.Transaction) error { + err := s.Write(ctx, txn, storage.RemoveOp, storage.MustParsePath("/foo"), jsonFixture) + if !errors.Is(err, badger.ErrTxnTooBig) { + t.Errorf("expected %v, got %v", badger.ErrTxnTooBig, err) } - return nil + return err }) - if errTxnD != nil { - t.Fatal(errTxnD) + if !errors.Is(err, badger.ErrTxnTooBig) { + t.Errorf("expected %v, got %v", badger.ErrTxnTooBig, err) } - results, errRead := storage.ReadOne(ctx, s, storage.MustParsePath("/foo")) - if !storage.IsNotFound(errRead) { - t.Fatal(errRead) + // check expected state again + res, err = storage.ReadOne(ctx, s, storage.MustParsePath("/foo")) + if err != nil { + t.Fatal(err) } - if results != nil { - t.Fatalf("Unexpected results %v", results) + if exp, act := nbKeys, len(res.(map[string]interface{})); exp != act { + t.Fatalf("expected %d keys, read %d", exp, act) } + }) } diff --git a/storage/errors.go b/storage/errors.go index d83b275ac7..2ea68fca34 100644 --- a/storage/errors.go +++ b/storage/errors.go @@ -99,7 +99,7 @@ func IsIndexingNotSupported(error) bool { return false } func writeConflictError(path Path) *Error { return &Error{ Code: WriteConflictErr, - Message: fmt.Sprint(path), + Message: path.String(), } } diff --git a/storage/inmem/inmem_test.go b/storage/inmem/inmem_test.go index 1356bcfa03..8902d751fa 100644 --- a/storage/inmem/inmem_test.go +++ b/storage/inmem/inmem_test.go @@ -610,7 +610,7 @@ func TestInMemoryContext(t *testing.T) { } _, err = store.Register(ctx, txn, storage.TriggerConfig{ - OnCommit: func(ctx context.Context, txn storage.Transaction, event storage.TriggerEvent) { + OnCommit: func(_ context.Context, _ storage.Transaction, event storage.TriggerEvent) { if event.Context.Get("foo") != "bar" { t.Fatalf("Expected foo/bar in context but got: %+v", event.Context) } else if event.Context.Get("deadbeef") != nil { diff --git a/storage/inmem/txn.go b/storage/inmem/txn.go index 5254523ec4..440ec9f6ac 100644 --- a/storage/inmem/txn.go +++ b/storage/inmem/txn.go @@ -311,20 +311,21 @@ func newUpdateArray(data []interface{}, op storage.PatchOp, path storage.Path, i return nil, err } - if op == storage.AddOp { + switch op { + case storage.AddOp: cpy := make([]interface{}, len(data)+1) copy(cpy[:pos], data[:pos]) copy(cpy[pos+1:], data[pos:]) cpy[pos] = value return &update{path[:len(path)-1], false, cpy}, nil - } else if op == storage.RemoveOp { + case storage.RemoveOp: cpy := make([]interface{}, len(data)-1) copy(cpy[:pos], data[:pos]) copy(cpy[pos:], data[pos+1:]) return &update{path[:len(path)-1], false, cpy}, nil - } else { + default: cpy := make([]interface{}, len(data)) copy(cpy, data) cpy[pos] = value diff --git a/storage/interface.go b/storage/interface.go index 7502e7babc..38252f32da 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -6,6 +6,8 @@ package storage import ( "context" + + "github.com/open-policy-agent/opa/metrics" ) // Transaction defines the interface that identifies a consistent snapshot over @@ -77,6 +79,27 @@ func (ctx *Context) Put(key, value interface{}) { ctx.values[key] = value } +var metricsKey = struct{}{} + +// WithMetrics allows passing metrics via the Context. +// It puts the metrics object in the ctx, and returns the same +// ctx (not a copy) for convenience. +func (ctx *Context) WithMetrics(m metrics.Metrics) *Context { + ctx.values[metricsKey] = m + return ctx +} + +// Metrics() allows using a Context's metrics. Returns nil if metrics +// were not attached to the Context. +func (ctx *Context) Metrics() metrics.Metrics { + if m, ok := ctx.values[metricsKey]; ok { + if met, ok := m.(metrics.Metrics); ok { + return met + } + } + return nil +} + // WriteParams specifies the TransactionParams for a write transaction. var WriteParams = TransactionParams{ Write: true, diff --git a/storage/internal/errors/errors.go b/storage/internal/errors/errors.go index 74792f63e4..0bba74b907 100644 --- a/storage/internal/errors/errors.go +++ b/storage/internal/errors/errors.go @@ -30,3 +30,10 @@ func NewNotFoundErrorf(f string, a ...interface{}) *storage.Error { Message: msg, } } + +func NewWriteConflictError(p storage.Path) *storage.Error { + return &storage.Error{ + Code: storage.WriteConflictErr, + Message: p.String(), + } +} diff --git a/storage/internal/ptr/ptr.go b/storage/internal/ptr/ptr.go index 6778805faa..56772f7976 100644 --- a/storage/internal/ptr/ptr.go +++ b/storage/internal/ptr/ptr.go @@ -37,12 +37,32 @@ func Ptr(data interface{}, path storage.Path) (interface{}, error) { } func ValidateArrayIndex(arr []interface{}, s string, path storage.Path) (int, error) { - idx, err := strconv.Atoi(s) - if err != nil { + idx, ok := isInt(s) + if !ok { return 0, errors.NewNotFoundErrorWithHint(path, errors.ArrayIndexTypeMsg) } - if idx < 0 || idx >= len(arr) { + return inRange(idx, arr, path) +} + +// ValidateArrayIndexForWrite also checks that `s` is a valid way to address an +// array element like `ValidateArrayIndex`, but returns a `resource_conflict` error +// if it is not. +func ValidateArrayIndexForWrite(arr []interface{}, s string, i int, path storage.Path) (int, error) { + idx, ok := isInt(s) + if !ok { + return 0, errors.NewWriteConflictError(path[:i-1]) + } + return inRange(idx, arr, path) +} + +func isInt(s string) (int, bool) { + idx, err := strconv.Atoi(s) + return idx, err == nil +} + +func inRange(i int, arr []interface{}, path storage.Path) (int, error) { + if i < 0 || i >= len(arr) { return 0, errors.NewNotFoundErrorWithHint(path, errors.OutOfRangeMsg) } - return idx, nil + return i, nil } diff --git a/test/authz/authz_bench_test.go b/test/authz/authz_bench_test.go index 9311ca8bc1..dc6cf9c75e 100644 --- a/test/authz/authz_bench_test.go +++ b/test/authz/authz_bench_test.go @@ -6,16 +6,25 @@ package authz import ( "context" + "io/ioutil" + "os" "testing" "github.com/open-policy-agent/opa/ast" + "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/rego" "github.com/open-policy-agent/opa/storage" + "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/storage/inmem" ) func BenchmarkAuthzForbidAuthn(b *testing.B) { - runAuthzBenchmark(b, ForbidIdentity, 10) + b.Run("inmem", func(b *testing.B) { + runAuthzBenchmark(b, ForbidIdentity, 10) + }) + b.Run("disk", func(b *testing.B) { + runAuthzBenchmark(b, ForbidIdentity, 10, true) + }) } func BenchmarkAuthzForbidPath(b *testing.B) { @@ -38,7 +47,7 @@ func BenchmarkAuthzAllow1000Paths(b *testing.B) { runAuthzBenchmark(b, Allow, 1000) } -func runAuthzBenchmark(b *testing.B, mode InputMode, numPaths int) { +func runAuthzBenchmark(b *testing.B, mode InputMode, numPaths int, extras ...bool) { profile := DataSetProfile{ NumTokens: 1000, @@ -47,7 +56,36 @@ func runAuthzBenchmark(b *testing.B, mode InputMode, numPaths int) { ctx := context.Background() data := GenerateDataset(profile) - store := inmem.NewFromObject(data) + + useDisk := false + if len(extras) > 0 { + useDisk = extras[0] + } + + var store storage.Store + if useDisk { + rootDir, err := ioutil.TempDir("", "test-e2e-bench-disk") + if err != nil { + panic(err) + } + + defer os.RemoveAll(rootDir) + store, err = disk.New(ctx, logging.NewNoOpLogger(), nil, disk.Options{ + Dir: rootDir, + Partitions: nil, + }) + if err != nil { + b.Fatal(err) + } + + err = storage.WriteOne(ctx, store, storage.AddOp, storage.Path{}, data) + if err != nil { + b.Fatal(err) + } + } else { + store = inmem.NewFromObject(data) + } + txn := storage.NewTransactionOrDie(ctx, store) compiler := ast.NewCompiler() module := ast.MustParseModule(Policy) diff --git a/test/e2e/authz/authz_bench_integration_test.go b/test/e2e/authz/authz_bench_integration_test.go index 92605b2e54..5eb82a3379 100644 --- a/test/e2e/authz/authz_bench_integration_test.go +++ b/test/e2e/authz/authz_bench_integration_test.go @@ -24,7 +24,8 @@ func TestMain(m *testing.M) { flag.Parse() testServerParams := e2e.NewAPIServerTestParams() - + disk, cleanup := diskStorage() + testServerParams.DiskStorage = disk var err error testRuntime, err = e2e.NewTestRuntime(testServerParams) if err != nil { @@ -32,6 +33,11 @@ func TestMain(m *testing.M) { } errc := testRuntime.RunTests(m) + if cleanup != nil { + if err := cleanup(); err != nil { + panic(err) + } + } os.Exit(errc) } diff --git a/test/e2e/authz/disk.go b/test/e2e/authz/disk.go new file mode 100644 index 0000000000..4bb8592811 --- /dev/null +++ b/test/e2e/authz/disk.go @@ -0,0 +1,25 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +//go:build bench_disk +// +build bench_disk + +// nolint: deadcode,unused // build tags confuse these linters +package authz + +import ( + "io/ioutil" + "os" + + "github.com/open-policy-agent/opa/storage/disk" +) + +func diskStorage() (*disk.Options, func() error) { + dir, err := ioutil.TempDir("", "disk-store") + if err != nil { + panic(err) + } + + return &disk.Options{Dir: dir, Partitions: nil}, func() error { return os.RemoveAll(dir) } +} diff --git a/test/e2e/authz/nodisk.go b/test/e2e/authz/nodisk.go new file mode 100644 index 0000000000..8849441bd1 --- /dev/null +++ b/test/e2e/authz/nodisk.go @@ -0,0 +1,15 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +//go:build !bench_disk +// +build !bench_disk + +// nolint: deadcode,unused // build tags confuse these linters +package authz + +import "github.com/open-policy-agent/opa/storage/disk" + +func diskStorage() (*disk.Options, func() error) { + return nil, nil +} diff --git a/test/e2e/concurrency/concurrency_test.go b/test/e2e/concurrency/concurrency_test.go index 95825725d3..e084af0d86 100644 --- a/test/e2e/concurrency/concurrency_test.go +++ b/test/e2e/concurrency/concurrency_test.go @@ -2,6 +2,7 @@ package concurrency import ( "flag" + "io/ioutil" "os" "runtime" "strings" @@ -9,6 +10,7 @@ import ( "testing" "github.com/open-policy-agent/opa/server/types" + "github.com/open-policy-agent/opa/storage/disk" "github.com/open-policy-agent/opa/test/e2e" ) @@ -18,13 +20,26 @@ func TestMain(m *testing.M) { flag.Parse() testServerParams := e2e.NewAPIServerTestParams() - var err error - testRuntime, err = e2e.NewTestRuntimeWithOpts(e2e.TestRuntimeOpts{}, testServerParams) + dir, err := ioutil.TempDir("", "disk-store") if err != nil { - os.Exit(1) + panic(err) + } + defer func() { os.RemoveAll(dir) }() + + for _, opts := range []*disk.Options{ + nil, + {Dir: dir, Partitions: nil}, + } { + var err error + testServerParams.DiskStorage = opts + testRuntime, err = e2e.NewTestRuntime(testServerParams) + if err != nil { + panic(err) + } + if ec := testRuntime.RunTests(m); ec != 0 { + os.Exit(ec) + } } - - os.Exit(testRuntime.RunTests(m)) } func TestConcurrencyGetV1Data(t *testing.T) { diff --git a/test/e2e/shutdown/shutdown_test.go b/test/e2e/shutdown/shutdown_test.go index a3e5580734..8579d8e2b4 100644 --- a/test/e2e/shutdown/shutdown_test.go +++ b/test/e2e/shutdown/shutdown_test.go @@ -43,7 +43,7 @@ func TestShutdownWaitPeriod(t *testing.T) { time.Sleep(1500 * time.Millisecond) - // Ensure that OPA i still running + // Ensure that OPA is still running err = testRuntime.HealthCheck(testRuntime.URL()) if err != nil { t.Fatalf("Expected health endpoint to be up but got:\n\n%v", err) diff --git a/test/e2e/wasm/authz/authz_bench_integration_test.go b/test/e2e/wasm/authz/authz_bench_integration_test.go index 9c1923d497..bf67e58ffd 100644 --- a/test/e2e/wasm/authz/authz_bench_integration_test.go +++ b/test/e2e/wasm/authz/authz_bench_integration_test.go @@ -2,6 +2,7 @@ // Use of this source code is governed by an Apache2 // license that can be found in the LICENSE file. +//go:build opa_wasm // +build opa_wasm package authz @@ -36,7 +37,8 @@ func TestMain(m *testing.M) { flag.Parse() testServerParams := e2e.NewAPIServerTestParams() - + var cleanup func() error + testServerParams.DiskStorage, cleanup = diskStorage() var err error testRuntime, err = e2e.NewTestRuntime(testServerParams) if err != nil { @@ -93,6 +95,11 @@ func TestMain(m *testing.M) { } errc := testRuntime.RunTests(m) + if errc == 0 && cleanup != nil { + if err := cleanup(); err != nil { + panic(err) + } + } os.Exit(errc) } diff --git a/test/e2e/wasm/authz/disk.go b/test/e2e/wasm/authz/disk.go new file mode 100644 index 0000000000..4bb8592811 --- /dev/null +++ b/test/e2e/wasm/authz/disk.go @@ -0,0 +1,25 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +//go:build bench_disk +// +build bench_disk + +// nolint: deadcode,unused // build tags confuse these linters +package authz + +import ( + "io/ioutil" + "os" + + "github.com/open-policy-agent/opa/storage/disk" +) + +func diskStorage() (*disk.Options, func() error) { + dir, err := ioutil.TempDir("", "disk-store") + if err != nil { + panic(err) + } + + return &disk.Options{Dir: dir, Partitions: nil}, func() error { return os.RemoveAll(dir) } +} diff --git a/test/e2e/wasm/authz/nodisk.go b/test/e2e/wasm/authz/nodisk.go new file mode 100644 index 0000000000..8849441bd1 --- /dev/null +++ b/test/e2e/wasm/authz/nodisk.go @@ -0,0 +1,15 @@ +// Copyright 2022 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +//go:build !bench_disk +// +build !bench_disk + +// nolint: deadcode,unused // build tags confuse these linters +package authz + +import "github.com/open-policy-agent/opa/storage/disk" + +func diskStorage() (*disk.Options, func() error) { + return nil, nil +}