From ac20ef2bf5c3f3d34c3bed3b5b6ea9a42f20ed22 Mon Sep 17 00:00:00 2001 From: Philip Conrad Date: Thu, 6 Oct 2022 16:27:54 -0400 Subject: [PATCH] server+sdk+plugins: Integrate NDBCache into decision logging. (#5147) This commit integrates the non-deterministic builtins caching system into decision logging, both in the server and sdk packages. Some reworking of the NDBCache's serialization format were required to accommodate this. The feature is disabled by default, and must be opted into by user configuration. The feature can be enabled via a top-level config key: nd_builtin_cache=true The NDBCache is exposed to the masking system under the `/nd_builtin_cache` path, which allows masking or dropping sensitive values from decision logs selectively. Note: If a decision log event exceeds the `upload_size_limit_bytes` value for the OPA instance, OPA will reattempt uploading it, after dropping the NDBCache from the event. This behavior will trigger a log error, and will increment the `decision_logs_nd_builtin_cache_dropped` metrics counter. Fixes: #1514 Signed-off-by: Philip Conrad --- config/config.go | 6 + docs/content/configuration.md | 1 + docs/content/management-decision-logs.md | 9 +- plugins/logs/mask.go | 23 +++- plugins/logs/plugin.go | 98 +++++++++----- plugins/logs/plugin_test.go | 161 +++++++++++++++++++++-- rego/rego.go | 20 ++- rego/rego_test.go | 2 +- runtime/runtime.go | 5 + sdk/opa.go | 28 +++- sdk/opa_test.go | 72 ++++++++++ server/buffer.go | 31 ++--- server/server.go | 76 ++++++++--- topdown/builtins/builtins.go | 49 ++++--- 14 files changed, 472 insertions(+), 109 deletions(-) diff --git a/config/config.go b/config/config.go index 837b075a38..57c9a5b641 100644 --- a/config/config.go +++ b/config/config.go @@ -32,6 +32,7 @@ type Config struct { DefaultDecision *string `json:"default_decision,omitempty"` DefaultAuthorizationDecision *string `json:"default_authorization_decision,omitempty"` Caching json.RawMessage `json:"caching,omitempty"` + NDBuiltinCache bool `json:"nd_builtin_cache,omitempty"` PersistenceDirectory *string `json:"persistence_directory,omitempty"` DistributedTracing json.RawMessage `json:"distributed_tracing,omitempty"` Storage *struct { @@ -87,6 +88,11 @@ func (c Config) DefaultAuthorizationDecisionRef() ast.Ref { return r } +// NDBuiltinCacheEnabled returns if the ND builtins cache should be used. +func (c Config) NDBuiltinCacheEnabled() bool { + return c.NDBuiltinCache +} + func (c *Config) validateAndInjectDefaults(id string) error { if c.DefaultDecision == nil { diff --git a/docs/content/configuration.md b/docs/content/configuration.md index 419b9f361c..d98f645b15 100644 --- a/docs/content/configuration.md +++ b/docs/content/configuration.md @@ -740,6 +740,7 @@ For *GHCR* (Github Container Registry) you can use a developer PAT (personal acc | `default_authorization_decision` | `string` | No (default: `/system/authz/allow`) | Set path of default authorization decision for OPA's API. | | `persistence_directory` | `string` | No (default `$PWD/.opa`) | Set directory to use for persistence with options like `bundles[_].persist`. | | `plugins` | `object` | No (default: `{}`) | Location for custom plugin configuration. See [Plugins](../plugins) for details. | +| `nd_builtin_cache` | `boolean` | No (default: `false`) | Enable the non-deterministic builtins caching system during policy evaluation, and include the contents of the cache in decision logs. Note that decision logs that are larger than `upload_size_limit_bytes` will drop the `nd_builtin_cache` key from the log entry before uploading. | ### Keys diff --git a/docs/content/management-decision-logs.md b/docs/content/management-decision-logs.md index d466e4632f..b15843d8d2 100644 --- a/docs/content/management-decision-logs.md +++ b/docs/content/management-decision-logs.md @@ -75,6 +75,7 @@ Decision log updates contain the following fields: | `[_].metrics` | `object` | Key-value pairs of [performance metrics](../rest-api#performance-metrics). | | `[_].erased` | `array[string]` | Set of JSON Pointers specifying fields in the event that were erased. | | `[_].masked` | `array[string]` | Set of JSON Pointers specifying fields in the event that were masked. | +| `[_].nd_builtin_cache` | `object` | Key-value pairs of non-deterministic builtin names, paired with objects specifying the input/output mappings for each unique invocation of that builtin during policy evaluation. Intended for use in debugging and decision replay. Receivers will need to decode the JSON using Rego's JSON decoders. | If the decision log was successfully uploaded to the remote service, it should respond with an HTTP 2xx status. If the service responds with a non-2xx status, OPA will requeue the last chunk containing decision log events and upload it @@ -95,6 +96,12 @@ the last chunk. `Equilibrium`: If the chunk size is between 90% and 100% of the user-configured limit, maintain soft limit value. +When an event containing `nd_builtin_cache` cannot fit into a chunk smaller than `upload_size_limit_bytes`, OPA will +drop the `nd_builtin_cache` key from the event, and will retry encoding the chunk without the non-deterministic +builtins cache information. This best-effort approach ensures that OPA reports decision log events as much as possible, +and bounds how large decision log events can get. This size-bounding is necessary, because some non-deterministic builtins +(such as `http.send`) can increase the decision log event size by a potentially unbounded amount. + ### Local Decision Logs Local console logging of decisions can be enabled via the `console` config option. @@ -172,7 +179,7 @@ from the decision log event. The erased paths are recorded on the event itself: There are a few restrictions on the JSON Pointers that OPA will erase: -* Pointers must be prefixed with `/input` or `/result`. +* Pointers must be prefixed with `/input`, `/result`, or `/nd_builtin_cache`. * Pointers may be undefined. For example `/input/name/first` in the example above would be undefined. Undefined pointers are ignored. * Pointers must refer to object keys. Pointers to array elements will be treated diff --git a/plugins/logs/mask.go b/plugins/logs/mask.go index 1355207e2f..9307e2f041 100644 --- a/plugins/logs/mask.go +++ b/plugins/logs/mask.go @@ -21,8 +21,9 @@ const ( maskOPRemove maskOP = "remove" maskOPUpsert maskOP = "upsert" - partInput = "input" - partResult = "result" + partInput = "input" + partResult = "result" + partNDBCache = "nd_builtin_cache" ) var ( @@ -64,7 +65,9 @@ func newMaskRule(path string, opts ...maskRuleOption) (*maskRule, error) { parts := strings.Split(path[1:], "/") - if parts[0] != partInput && parts[0] != partResult { + switch parts[0] { + case partInput, partResult, partNDBCache: // OK + default: return nil, fmt.Errorf("mask prefix not allowed: %v", parts[0]) } @@ -130,8 +133,8 @@ func withFailUndefinedPath() maskRuleOption { func (r maskRule) Mask(event *EventV1) error { - var maskObj *interface{} // pointer to event Input|Result object - var maskObjPtr **interface{} // pointer to the event Input|Result pointer itself + var maskObj *interface{} // pointer to event Input|Result|NDBCache object + var maskObjPtr **interface{} // pointer to the event Input|Result|NDBCache pointer itself switch p := r.escapedParts[0]; p { case partInput: @@ -143,7 +146,6 @@ func (r maskRule) Mask(event *EventV1) error { } maskObj = event.Input maskObjPtr = &event.Input - case partResult: if event.Result == nil { if r.failUndefinedPath { @@ -153,6 +155,15 @@ func (r maskRule) Mask(event *EventV1) error { } maskObj = event.Result maskObjPtr = &event.Result + case partNDBCache: + if event.NDBuiltinCache == nil { + if r.failUndefinedPath { + return errMaskInvalidObject + } + return nil + } + maskObj = event.NDBuiltinCache + maskObjPtr = &event.NDBuiltinCache default: return fmt.Errorf("illegal path value: %s", p) } diff --git a/plugins/logs/plugin.go b/plugins/logs/plugin.go index cd75fd7631..5ff1e99d47 100644 --- a/plugins/logs/plugin.go +++ b/plugins/logs/plugin.go @@ -44,21 +44,22 @@ type Logger interface { // the struct. Any changes here MUST be reflected in the AST() // implementation below. type EventV1 struct { - Labels map[string]string `json:"labels"` - DecisionID string `json:"decision_id"` - Revision string `json:"revision,omitempty"` // Deprecated: Use Bundles instead - Bundles map[string]BundleInfoV1 `json:"bundles,omitempty"` - Path string `json:"path,omitempty"` - Query string `json:"query,omitempty"` - Input *interface{} `json:"input,omitempty"` - Result *interface{} `json:"result,omitempty"` - MappedResult *interface{} `json:"mapped_result,omitempty"` - Erased []string `json:"erased,omitempty"` - Masked []string `json:"masked,omitempty"` - Error error `json:"error,omitempty"` - RequestedBy string `json:"requested_by,omitempty"` - Timestamp time.Time `json:"timestamp"` - Metrics map[string]interface{} `json:"metrics,omitempty"` + Labels map[string]string `json:"labels"` + DecisionID string `json:"decision_id"` + Revision string `json:"revision,omitempty"` // Deprecated: Use Bundles instead + Bundles map[string]BundleInfoV1 `json:"bundles,omitempty"` + Path string `json:"path,omitempty"` + Query string `json:"query,omitempty"` + Input *interface{} `json:"input,omitempty"` + Result *interface{} `json:"result,omitempty"` + MappedResult *interface{} `json:"mapped_result,omitempty"` + NDBuiltinCache *interface{} `json:"nd_builtin_cache,omitempty"` + Erased []string `json:"erased,omitempty"` + Masked []string `json:"masked,omitempty"` + Error error `json:"error,omitempty"` + RequestedBy string `json:"requested_by,omitempty"` + Timestamp time.Time `json:"timestamp"` + Metrics map[string]interface{} `json:"metrics,omitempty"` inputAST ast.Value } @@ -87,6 +88,7 @@ var queryKey = ast.StringTerm("query") var inputKey = ast.StringTerm("input") var resultKey = ast.StringTerm("result") var mappedResultKey = ast.StringTerm("mapped_result") +var ndBuiltinCacheKey = ast.StringTerm("nd_builtin_cache") var erasedKey = ast.StringTerm("erased") var maskedKey = ast.StringTerm("masked") var errorKey = ast.StringTerm("error") @@ -159,6 +161,14 @@ func (e *EventV1) AST() (ast.Value, error) { event.Insert(mappedResultKey, ast.NewTerm(mResults)) } + if e.NDBuiltinCache != nil { + ndbCache, err := roundtripJSONToAST(e.NDBuiltinCache) + if err != nil { + return nil, err + } + event.Insert(ndBuiltinCacheKey, ast.NewTerm(ndbCache)) + } + if len(e.Erased) > 0 { erased := make([]*ast.Term, len(e.Erased)) for i, v := range e.Erased { @@ -226,6 +236,7 @@ const ( defaultBufferSizeLimitBytes = int64(0) // unlimited defaultMaskDecisionPath = "/system/log/mask" logDropCounterName = "decision_logs_dropped" + logNDBDropCounterName = "decision_logs_nd_builtin_cache_dropped" defaultResourcePath = "/logs" ) @@ -248,6 +259,7 @@ type Config struct { MaskDecision *string `json:"mask_decision"` ConsoleLogs bool `json:"console"` Resource *string `json:"resource"` + NDBuiltinCache bool `json:"nd_builtin_cache,omitempty"` maskDecisionRef ast.Ref } @@ -557,18 +569,19 @@ func (p *Plugin) Log(ctx context.Context, decision *server.Info) error { } event := EventV1{ - Labels: p.manager.Labels(), - DecisionID: decision.DecisionID, - Revision: decision.Revision, - Bundles: bundles, - Path: decision.Path, - Query: decision.Query, - Input: decision.Input, - Result: decision.Results, - MappedResult: decision.MappedResults, - RequestedBy: decision.RemoteAddr, - Timestamp: decision.Timestamp, - inputAST: decision.InputAST, + Labels: p.manager.Labels(), + DecisionID: decision.DecisionID, + Revision: decision.Revision, + Bundles: bundles, + Path: decision.Path, + Query: decision.Query, + Input: decision.Input, + Result: decision.Results, + MappedResult: decision.MappedResults, + NDBuiltinCache: decision.NDBuiltinCache, + RequestedBy: decision.RemoteAddr, + Timestamp: decision.Timestamp, + inputAST: decision.InputAST, } if decision.Metrics != nil { @@ -793,6 +806,9 @@ func (p *Plugin) reconfigure(config interface{}) { p.config = *newConfig } +// NOTE(philipc): Because ND builtins caching can cause unbounded growth in +// decision log entry size, we do best-effort event encoding here, and when we +// run out of space, we drop the ND builtins cache, and try encoding again. func (p *Plugin) encodeAndBufferEvent(event EventV1) { if p.limiter != nil { if !p.limiter.Allow() { @@ -807,11 +823,29 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) { result, err := p.enc.Write(event) if err != nil { - // TODO(tsandall): revisit this now that we have an API that - // can return an error. Should the default behaviour be to - // fail-closed as we do for plugins? - p.logger.Error("Log encoding failed: %v.", err) - return + // If there's no ND builtins cache in the event, then we don't + // need to retry encoding anything. + if event.NDBuiltinCache == nil { + // TODO(tsandall): revisit this now that we have an API that + // can return an error. Should the default behaviour be to + // fail-closed as we do for plugins? + p.logger.Error("Log encoding failed: %v.", err) + return + } + + // Attempt to encode the event again, dropping the ND builtins cache. + newEvent := event + newEvent.NDBuiltinCache = nil + + result, err = p.enc.Write(newEvent) + if err != nil { + p.logger.Error("Log encoding failed: %v.", err) + return + } + + // Re-encoding was successful, but we still need to alert users. + p.logger.Error("ND builtins cache dropped from this event to fit under maximum upload size limits. Increase upload size limit or change usage of non-deterministic builtins.") + p.metrics.Counter(logNDBDropCounterName).Incr() } for _, chunk := range result { diff --git a/plugins/logs/plugin_test.go b/plugins/logs/plugin_test.go index ccd89125cb..24e8b1300e 100644 --- a/plugins/logs/plugin_test.go +++ b/plugins/logs/plugin_test.go @@ -17,6 +17,7 @@ import ( "net/http/httptest" "os" "reflect" + "strings" "testing" "time" @@ -31,6 +32,7 @@ import ( "github.com/open-policy-agent/opa/storage" inmem "github.com/open-policy-agent/opa/storage/inmem/test" "github.com/open-policy-agent/opa/topdown" + "github.com/open-policy-agent/opa/topdown/builtins" "github.com/open-policy-agent/opa/topdown/print" "github.com/open-policy-agent/opa/util" "github.com/open-policy-agent/opa/version" @@ -911,6 +913,57 @@ func TestPluginRateLimitDropCountStatus(t *testing.T) { } } +func TestChunkMaxUploadSizeLimitNDBCacheDropping(t *testing.T) { + ctx := context.Background() + testLogger := test.New() + + ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z") + if err != nil { + panic(err) + } + + fixture := newTestFixture(t, testFixtureOptions{ + ConsoleLogger: testLogger, + ReportingMaxDecisionsPerSecond: float64(1), // 1 decision per second + ReportingUploadSizeLimitBytes: 400, + }) + defer fixture.server.stop() + + fixture.plugin.metrics = metrics.New() + + var input interface{} = map[string]interface{}{"method": "GET"} + var result interface{} = false + + // Purposely oversized NDBCache entry will force dropping during Log(). + var ndbCacheExample interface{} = ast.MustJSON(builtins.NDBCache{ + "test.custom_space_waster": ast.NewObject([2]*ast.Term{ + ast.ArrayTerm(), + ast.StringTerm(strings.Repeat("Wasted space... ", 200)), + }), + }.AsValue()) + + event := &server.Info{ + DecisionID: "abc", + Path: "foo/bar", + Input: &input, + Results: &result, + RemoteAddr: "test", + Timestamp: ts, + NDBuiltinCache: &ndbCacheExample, + } + + beforeNDBDropCount := fixture.plugin.metrics.Counter(logNDBDropCounterName).Value().(uint64) + err = fixture.plugin.Log(ctx, event) // event should be written into the encoder + if err != nil { + t.Fatal(err) + } + afterNDBDropCount := fixture.plugin.metrics.Counter(logNDBDropCounterName).Value().(uint64) + + if afterNDBDropCount != beforeNDBDropCount+1 { + t.Fatalf("Expected %v NDBCache drop events, saw %v events instead.", beforeNDBDropCount+1, afterNDBDropCount) + } +} + func TestPluginRateLimitBadConfig(t *testing.T) { manager, _ := plugins.New(nil, "test-instance-id", inmem.New()) @@ -1335,16 +1388,18 @@ func (a appendingPrintHook) Print(_ print.Context, s string) error { func TestPluginMasking(t *testing.T) { tests := []struct { - note string - rawPolicy []byte - expErased []string - expMasked []string - expPrinted []string - errManager error - expErr error - input interface{} - expected interface{} - reconfigure bool + note string + rawPolicy []byte + expErased []string + expMasked []string + expPrinted []string + errManager error + expErr error + input interface{} + expected interface{} + ndbcache interface{} + ndbc_expected interface{} + reconfigure bool }{ { note: "simple erase (with body true)", @@ -1561,6 +1616,59 @@ func TestPluginMasking(t *testing.T) { }, expPrinted: []string{"Erasing /input/password"}, }, + { + note: "simple upsert on nd_builtin_cache", + rawPolicy: []byte(` + package system.log + mask[{"op": "upsert", "path": "/nd_builtin_cache/rand.intn", "value": x}] { + input.nd_builtin_cache["rand.intn"] + x := "**REDACTED**" + }`), + expMasked: []string{"/nd_builtin_cache/rand.intn"}, + ndbcache: map[string]interface{}{ + // Simulate rand.intn("z", 15) call, with output of 7. + "rand.intn": map[string]interface{}{"[\"z\",15]": json.Number("7")}, + }, + ndbc_expected: map[string]interface{}{ + "rand.intn": "**REDACTED**", + }, + }, + { + note: "simple upsert on nd_builtin_cache with multiple entries", + rawPolicy: []byte(` + package system.log + mask[{"op": "upsert", "path": "/nd_builtin_cache/rand.intn", "value": x}] { + input.nd_builtin_cache["rand.intn"] + x := "**REDACTED**" + } + + mask[{"op": "upsert", "path": "/nd_builtin_cache/net.lookup_ip_addr", "value": y}] { + obj := input.nd_builtin_cache["net.lookup_ip_addr"] + y := object.union({k: "4.4.x.x" | obj[k]; startswith(k, "[\"4.4.")}, + {k: obj[k] | obj[k]; not startswith(k, "[\"4.4.")}) + } + `), + expMasked: []string{"/nd_builtin_cache/net.lookup_ip_addr", "/nd_builtin_cache/rand.intn"}, + ndbcache: map[string]interface{}{ + // Simulate rand.intn("z", 15) call, with output of 7. + "rand.intn": map[string]interface{}{"[\"z\",15]": json.Number("7")}, + "net.lookup_ip_addr": map[string]interface{}{ + "[\"1.1.1.1\"]": "1.1.1.1", + "[\"2.2.2.2\"]": "2.2.2.2", + "[\"3.3.3.3\"]": "3.3.3.3", + "[\"4.4.4.4\"]": "4.4.4.4", + }, + }, + ndbc_expected: map[string]interface{}{ + "rand.intn": "**REDACTED**", + "net.lookup_ip_addr": map[string]interface{}{ + "[\"1.1.1.1\"]": "1.1.1.1", + "[\"2.2.2.2\"]": "2.2.2.2", + "[\"3.3.3.3\"]": "3.3.3.3", + "[\"4.4.4.4\"]": "4.4.x.x", + }, + }, + }, } for _, tc := range tests { @@ -1612,7 +1720,8 @@ func TestPluginMasking(t *testing.T) { } event := &EventV1{ - Input: &tc.input, + Input: &tc.input, + NDBuiltinCache: &tc.ndbcache, } if err := plugin.maskEvent(ctx, nil, event); err != nil { @@ -1623,6 +1732,10 @@ func TestPluginMasking(t *testing.T) { t.Fatalf("Expected %#+v but got %#+v:", tc.expected, *event.Input) } + if !reflect.DeepEqual(tc.ndbc_expected, *event.NDBuiltinCache) { + t.Fatalf("Expected %#+v but got %#+v:", tc.ndbc_expected, *event.NDBuiltinCache) + } + if len(tc.expErased) > 0 { if !reflect.DeepEqual(tc.expErased, event.Erased) { t.Fatalf("Expected erased %v set but got %v", tc.expErased, event.Erased) @@ -1920,6 +2033,13 @@ func TestEventV1ToAST(t *testing.T) { t.Fatalf("Unexpected error: %s", err) } + var ndbCacheExample interface{} = ast.MustJSON(builtins.NDBCache{ + "time.now_ns": ast.NewObject([2]*ast.Term{ + ast.ArrayTerm(), + ast.NumberTerm("1663803565571081429"), + }), + }.AsValue()) + cases := []struct { note string event EventV1 @@ -2038,6 +2158,25 @@ func TestEventV1ToAST(t *testing.T) { note: "big event", event: bigEvent, }, + { + note: "event with nd_builtin_cache", + event: EventV1{ + Labels: map[string]string{"foo": "1", "bar": "2"}, + DecisionID: "1234567890", + Bundles: map[string]BundleInfoV1{ + "b1": {"revision7"}, + "b2": {"0"}, + "b3": {}, + }, + Input: &goInput, + Path: "/http/authz/allow", + RequestedBy: "[::1]:59943", + Result: &result, + Timestamp: time.Now(), + inputAST: astInput, + NDBuiltinCache: &ndbCacheExample, + }, + }, } for _, tc := range cases { diff --git a/rego/rego.go b/rego/rego.go index 71efefb979..97c052d057 100644 --- a/rego/rego.go +++ b/rego/rego.go @@ -1186,10 +1186,13 @@ func (r *Rego) Eval(ctx context.Context) (ResultSet, error) { EvalInstrument(r.instrument), EvalTime(r.time), EvalInterQueryBuiltinCache(r.interQueryBuiltinCache), - EvalNDBuiltinCache(r.ndBuiltinCache), EvalSeed(r.seed), } + if r.ndBuiltinCache != nil { + evalArgs = append(evalArgs, EvalNDBuiltinCache(r.ndBuiltinCache)) + } + for _, qt := range r.queryTracers { evalArgs = append(evalArgs, EvalQueryTracer(qt)) } @@ -1260,7 +1263,10 @@ func (r *Rego) Partial(ctx context.Context) (*PartialQueries, error) { EvalMetrics(r.metrics), EvalInstrument(r.instrument), EvalInterQueryBuiltinCache(r.interQueryBuiltinCache), - EvalNDBuiltinCache(r.ndBuiltinCache), + } + + if r.ndBuiltinCache != nil { + evalArgs = append(evalArgs, EvalNDBuiltinCache(r.ndBuiltinCache)) } for _, t := range r.queryTracers { @@ -1933,7 +1939,6 @@ func (r *Rego) eval(ctx context.Context, ectx *EvalContext) (ResultSet, error) { WithIndexing(ectx.indexing). WithEarlyExit(ectx.earlyExit). WithInterQueryBuiltinCache(ectx.interQueryBuiltinCache). - WithNDBuiltinCache(ectx.ndBuiltinCache). WithStrictBuiltinErrors(r.strictBuiltinErrors). WithSeed(ectx.seed). WithPrintHook(ectx.printHook). @@ -1943,6 +1948,10 @@ func (r *Rego) eval(ctx context.Context, ectx *EvalContext) (ResultSet, error) { q = q.WithTime(ectx.time) } + if ectx.ndBuiltinCache != nil { + q = q.WithNDBuiltinCache(ectx.ndBuiltinCache) + } + for i := range ectx.queryTracers { q = q.WithQueryTracer(ectx.queryTracers[i]) } @@ -2210,7 +2219,6 @@ func (r *Rego) partial(ctx context.Context, ectx *EvalContext) (*PartialQueries, WithSkipPartialNamespace(r.skipPartialNamespace). WithShallowInlining(r.shallowInlining). WithInterQueryBuiltinCache(ectx.interQueryBuiltinCache). - WithNDBuiltinCache(ectx.ndBuiltinCache). WithStrictBuiltinErrors(r.strictBuiltinErrors). WithSeed(ectx.seed). WithPrintHook(ectx.printHook) @@ -2219,6 +2227,10 @@ func (r *Rego) partial(ctx context.Context, ectx *EvalContext) (*PartialQueries, q = q.WithTime(ectx.time) } + if ectx.ndBuiltinCache != nil { + q = q.WithNDBuiltinCache(ectx.ndBuiltinCache) + } + for i := range ectx.queryTracers { q = q.WithQueryTracer(ectx.queryTracers[i]) } diff --git a/rego/rego_test.go b/rego/rego_test.go index daad545251..0c2ab3363f 100644 --- a/rego/rego_test.go +++ b/rego/rego_test.go @@ -2086,7 +2086,7 @@ func TestEvalWithNDCache(t *testing.T) { } // Check and make sure we got exactly 2x items back in the ND builtin cache. - // NDBuiltinsCache always has the structure: map[ast.String]map[ast.Array]ast.Value + // NDBuiltinCache always has the structure: map[ast.String]map[ast.Array]ast.Value if len(ndBC) != 2 { t.Fatalf("Expected exactly 2 items in non-deterministic builtin cache. Found %d items.\n", len(ndBC)) } diff --git a/runtime/runtime.go b/runtime/runtime.go index f03df1757b..1cd8eb8cf4 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -474,6 +474,11 @@ func (rt *Runtime) Serve(ctx context.Context) error { WithMinTLSVersion(rt.Params.MinTLSVersion). WithDistributedTracingOpts(rt.Params.DistributedTracingOpts) + // If decision_logging plugin enabled, check to see if we opted in to the ND builtins cache. + if lp := logs.Lookup(rt.Manager); lp != nil { + rt.server = rt.server.WithNDBCacheEnabled(rt.Manager.Config.NDBuiltinCacheEnabled()) + } + if rt.Params.DiagnosticAddrs != nil { rt.server = rt.server.WithDiagnosticAddresses(*rt.Params.DiagnosticAddrs) } diff --git a/sdk/opa.go b/sdk/opa.go index da1b988ad9..2423b255f7 100644 --- a/sdk/opa.go +++ b/sdk/opa.go @@ -27,6 +27,7 @@ import ( "github.com/open-policy-agent/opa/server" "github.com/open-policy-agent/opa/storage" "github.com/open-policy-agent/opa/storage/inmem" + "github.com/open-policy-agent/opa/topdown/builtins" "github.com/open-policy-agent/opa/topdown/cache" "github.com/open-policy-agent/opa/topdown/print" ) @@ -221,9 +222,18 @@ func (opa *OPA) Stop(ctx context.Context) { func (opa *OPA) Decision(ctx context.Context, options DecisionOptions) (*DecisionResult, error) { record := server.Info{ - Timestamp: options.Now, - Path: options.Path, - Input: &options.Input, + Timestamp: options.Now, + Path: options.Path, + Input: &options.Input, + NDBuiltinCache: &options.NDBCache, + } + + // Only use non-deterministic builtins cache if it's available. + var ndbc builtins.NDBCache + if options.NDBCache != nil { + if v, ok := options.NDBCache.(builtins.NDBCache); ok { + ndbc = v + } } result, err := opa.executeTransaction( @@ -237,6 +247,7 @@ func (opa *OPA) Decision(ctx context.Context, options DecisionOptions) (*Decisio store: s.manager.Store, queryCache: s.queryCache, interQueryCache: s.interQueryBuiltinCache, + ndbcache: ndbc, txn: record.Txn, now: record.Timestamp, path: record.Path, @@ -257,9 +268,10 @@ func (opa *OPA) Decision(ctx context.Context, options DecisionOptions) (*Decisio // DecisionOptions contains parameters for query evaluation. type DecisionOptions struct { - Now time.Time // specifies wallclock time used for time.now_ns(), decision log timestamp, etc. - Path string // specifies name of policy decision to evaluate (e.g., example/allow) - Input interface{} // specifies value of the input document to evaluate policy with + Now time.Time // specifies wallclock time used for time.now_ns(), decision log timestamp, etc. + Path string // specifies name of policy decision to evaluate (e.g., example/allow) + Input interface{} // specifies value of the input document to evaluate policy with + NDBCache interface{} // specifies the non-deterministic builtins cache to use for evaluation. } // DecisionResult contains the output of query evaluation. @@ -319,6 +331,8 @@ func (opa *OPA) executeTransaction(ctx context.Context, record *server.Info, wor } // Partial returns a named decision. This function is threadsafe. +// Note(philipc): The NDBCache is unused here, because non-deterministic +// builtins are not run during partial evaluation. func (opa *OPA) Partial(ctx context.Context, options PartialOptions) (*PartialResult, error) { if options.Mapper == nil { @@ -433,6 +447,7 @@ type evalArgs struct { now time.Time path string input interface{} + ndbcache builtins.NDBCache m metrics.Metrics } @@ -479,6 +494,7 @@ func evaluate(ctx context.Context, args evalArgs) (interface{}, ast.Value, map[s rego.EvalTransaction(args.txn), rego.EvalMetrics(args.m), rego.EvalInterQueryBuiltinCache(args.interQueryCache), + rego.EvalNDBuiltinCache(args.ndbcache), ) if err != nil { return nil, inputAST, bundles, err diff --git a/sdk/opa_test.go b/sdk/opa_test.go index 763bfe4679..ad667976b0 100644 --- a/sdk/opa_test.go +++ b/sdk/opa_test.go @@ -14,8 +14,10 @@ import ( "testing" "time" + "github.com/open-policy-agent/opa/ast" "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/rego" + "github.com/open-policy-agent/opa/topdown/builtins" "github.com/fortytw2/leaktest" loggingtest "github.com/open-policy-agent/opa/logging/test" @@ -398,6 +400,76 @@ func TestDecisionLogging(t *testing.T) { } +func TestDecisionLoggingWithNDBCache(t *testing.T) { + + ctx := context.Background() + + server := sdktest.MustNewServer( + sdktest.MockBundle("/bundles/bundle.tar.gz", map[string]string{ + "main.rego": "package system\nmain = time.now_ns()", + }), + ) + + defer server.Stop() + + config := fmt.Sprintf(`{ + "services": { + "test": { + "url": %q + } + }, + "bundles": { + "test": { + "resource": "/bundles/bundle.tar.gz" + } + }, + "decision_logs": { + "console": true, + "nd_builtin_cache": true + } + }`, server.URL()) + + testLogger := loggingtest.New() + opa, err := sdk.New(ctx, sdk.Options{ + Config: strings.NewReader(config), + ConsoleLogger: testLogger, + }) + if err != nil { + t.Fatal(err) + } + + defer opa.Stop(ctx) + + // Build ND builtins cache, and populate with an unused builtin. + ndbc := builtins.NDBCache{} + ndbc.Put("rand.intn", ast.NewArray(), ast.NewObject([2]*ast.Term{ast.StringTerm("z"), ast.IntNumberTerm(7)})) + + // Verify that timestamp matches time.now_ns() value. + if _, err := opa.Decision(ctx, sdk.DecisionOptions{ + Now: time.Unix(0, 1619868194450288000).UTC(), + NDBCache: ndbc, + }); err != nil { + t.Fatal(err) + } + + entries := testLogger.Entries() + + // Check the contents of the ND builtins cache. + if cache, ok := entries[0].Fields["nd_builtin_cache"]; ok { + // Ensure the original cache entry for rand.intn is still there. + if _, ok := cache.(map[string]interface{})["rand.intn"]; !ok { + t.Fatalf("ND builtins cache was not preserved during evaluation.") + } + // Ensure time.now_ns entry was picked up correctly. + if _, ok := cache.(map[string]interface{})["time.now_ns"]; !ok { + t.Fatalf("ND builtins cache did not observe time.now_ns call during evaluation.") + } + } else { + t.Fatalf("ND builtins cache missing.") + } + +} + func TestQueryCaching(t *testing.T) { ctx := context.Background() diff --git a/server/buffer.go b/server/buffer.go index 2cb83e1d97..6544797d10 100644 --- a/server/buffer.go +++ b/server/buffer.go @@ -15,21 +15,22 @@ import ( // Info contains information describing a policy decision. type Info struct { - Txn storage.Transaction - Revision string // Deprecated: Use `Bundles` instead - Bundles map[string]BundleInfo - DecisionID string - RemoteAddr string - Query string - Path string - Timestamp time.Time - Input *interface{} - InputAST ast.Value - Results *interface{} - MappedResults *interface{} - Error error - Metrics metrics.Metrics - Trace []*topdown.Event + Txn storage.Transaction + Revision string // Deprecated: Use `Bundles` instead + Bundles map[string]BundleInfo + DecisionID string + RemoteAddr string + Query string + Path string + Timestamp time.Time + Input *interface{} + InputAST ast.Value + Results *interface{} + MappedResults *interface{} + NDBuiltinCache *interface{} + Error error + Metrics metrics.Metrics + Trace []*topdown.Event } // BundleInfo contains information describing a bundle. diff --git a/server/server.go b/server/server.go index 39cecf3189..b8d74bb5c3 100644 --- a/server/server.go +++ b/server/server.go @@ -46,6 +46,7 @@ import ( "github.com/open-policy-agent/opa/server/writer" "github.com/open-policy-agent/opa/storage" "github.com/open-policy-agent/opa/topdown" + "github.com/open-policy-agent/opa/topdown/builtins" iCache "github.com/open-policy-agent/opa/topdown/cache" "github.com/open-policy-agent/opa/topdown/lineage" "github.com/open-policy-agent/opa/tracing" @@ -136,6 +137,7 @@ type Server struct { interQueryBuiltinCache iCache.InterQueryCache allPluginsOkOnce bool distributedTracingOpts tracing.Options + ndbCacheEnabled bool } // Metrics defines the interface that the server requires for recording HTTP @@ -349,6 +351,12 @@ func (s *Server) WithDistributedTracingOpts(opts tracing.Options) *Server { return s } +// WithNDBCacheEnabled sets whether the ND builtins cache is to be used. +func (s *Server) WithNDBCacheEnabled(ndbCacheEnabled bool) *Server { + s.ndbCacheEnabled = ndbCacheEnabled + return s +} + // Listeners returns functions that listen and serve connections. func (s *Server) Listeners() ([]Loop, error) { loops := []Loop{} @@ -769,6 +777,11 @@ func (s *Server) execQuery(ctx context.Context, r *http.Request, br bundleRevisi rawInput = &x } + var ndbCache builtins.NDBCache + if s.ndbCacheEnabled { + ndbCache = builtins.NDBCache{} + } + opts := []func(*rego.Rego){ rego.Store(s.store), rego.Transaction(txn), @@ -784,6 +797,7 @@ func (s *Server) execQuery(ctx context.Context, r *http.Request, br bundleRevisi rego.PrintHook(s.manager.PrintHook()), rego.EnablePrintStatements(s.manager.EnablePrintStatements()), rego.DistributedTracingOpts(s.distributedTracingOpts), + rego.NDBuiltinCache(ndbCache), } for _, r := range s.manager.GetWasmResolvers() { @@ -796,7 +810,7 @@ func (s *Server) execQuery(ctx context.Context, r *http.Request, br bundleRevisi output, err := rego.Eval(ctx) if err != nil { - _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, "", parsedQuery.String(), rawInput, input, nil, err, m) + _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, "", parsedQuery.String(), rawInput, input, nil, ndbCache, err, m) return results, err } @@ -813,7 +827,7 @@ func (s *Server) execQuery(ctx context.Context, r *http.Request, br bundleRevisi } var x interface{} = results.Result - err = logger.Log(ctx, txn, decisionID, r.RemoteAddr, "", parsedQuery.String(), rawInput, input, &x, nil, m) + err = logger.Log(ctx, txn, decisionID, r.RemoteAddr, "", parsedQuery.String(), rawInput, input, &x, ndbCache, nil, m) return results, err } @@ -938,6 +952,11 @@ func (s *Server) v0QueryPath(w http.ResponseWriter, r *http.Request, urlPath str logger := s.getDecisionLogger(br) + var ndbCache builtins.NDBCache + if s.ndbCacheEnabled { + ndbCache = builtins.NDBCache{} + } + pqID := "v0QueryPath::" + urlPath preparedQuery, ok := s.getCachedPreparedEvalQuery(pqID, m) if !ok { @@ -957,14 +976,14 @@ func (s *Server) v0QueryPath(w http.ResponseWriter, r *http.Request, urlPath str partial, strictBuiltinErrors, instrument := false, false, false rego, err := s.makeRego(ctx, partial, strictBuiltinErrors, txn, input, urlPath, m, instrument, nil, opts) if err != nil { - _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, err, m) + _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, ndbCache, err, m) writer.ErrorAuto(w, err) return } pq, err := rego.PrepareForEval(ctx) if err != nil { - _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, err, m) + _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, ndbCache, err, m) writer.ErrorAuto(w, err) return } @@ -977,6 +996,7 @@ func (s *Server) v0QueryPath(w http.ResponseWriter, r *http.Request, urlPath str rego.EvalParsedInput(input), rego.EvalMetrics(m), rego.EvalInterQueryBuiltinCache(s.interQueryBuiltinCache), + rego.EvalNDBuiltinCache(ndbCache), } rs, err := preparedQuery.Eval( @@ -988,14 +1008,14 @@ func (s *Server) v0QueryPath(w http.ResponseWriter, r *http.Request, urlPath str // Handle results. if err != nil { - _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, err, m) + _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, ndbCache, err, m) writer.ErrorAuto(w, err) return } if len(rs) == 0 { err := types.NewErrorV1(types.CodeUndefinedDocument, fmt.Sprintf("%v: %v", types.MsgUndefinedError, stringPathToDataRef(urlPath))) - if logErr := logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, err, m); logErr != nil { + if logErr := logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, ndbCache, err, m); logErr != nil { writer.ErrorAuto(w, logErr) return } @@ -1003,7 +1023,7 @@ func (s *Server) v0QueryPath(w http.ResponseWriter, r *http.Request, urlPath str writer.Error(w, http.StatusNotFound, err) return } - err = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, &rs[0].Expressions[0].Value, nil, m) + err = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, &rs[0].Expressions[0].Value, ndbCache, nil, m) if err != nil { writer.ErrorAuto(w, err) return @@ -1350,6 +1370,11 @@ func (s *Server) v1DataGet(w http.ResponseWriter, r *http.Request) { logger := s.getDecisionLogger(br) + var ndbCache builtins.NDBCache + if s.ndbCacheEnabled { + ndbCache = builtins.NDBCache{} + } + var buf *topdown.BufferTracer if explainMode != types.ExplainOffV1 { @@ -1377,14 +1402,14 @@ func (s *Server) v1DataGet(w http.ResponseWriter, r *http.Request) { partial := false rego, err := s.makeRego(ctx, partial, strictBuiltinErrors, txn, input, urlPath, m, includeInstrumentation, buf, opts) if err != nil { - _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, err, m) + _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, ndbCache, err, m) writer.ErrorAuto(w, err) return } pq, err := rego.PrepareForEval(ctx) if err != nil { - _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, err, m) + _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, ndbCache, err, m) writer.ErrorAuto(w, err) return } @@ -1399,6 +1424,7 @@ func (s *Server) v1DataGet(w http.ResponseWriter, r *http.Request) { rego.EvalQueryTracer(buf), rego.EvalInterQueryBuiltinCache(s.interQueryBuiltinCache), rego.EvalInstrument(includeInstrumentation), + rego.EvalNDBuiltinCache(ndbCache), } rs, err := preparedQuery.Eval( @@ -1410,7 +1436,7 @@ func (s *Server) v1DataGet(w http.ResponseWriter, r *http.Request) { // Handle results. if err != nil { - _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, err, m) + _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, ndbCache, err, m) writer.ErrorAuto(w, err) return } @@ -1435,7 +1461,7 @@ func (s *Server) v1DataGet(w http.ResponseWriter, r *http.Request) { return } } - err = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, nil, m) + err = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, ndbCache, nil, m) if err != nil { writer.ErrorAuto(w, err) return @@ -1450,7 +1476,7 @@ func (s *Server) v1DataGet(w http.ResponseWriter, r *http.Request) { result.Explanation = s.getExplainResponse(explainMode, *buf, pretty) } - err = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, result.Result, nil, m) + err = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, result.Result, ndbCache, nil, m) if err != nil { writer.ErrorAuto(w, err) return @@ -1577,6 +1603,11 @@ func (s *Server) v1DataPost(w http.ResponseWriter, r *http.Request) { logger := s.getDecisionLogger(br) + var ndbCache builtins.NDBCache + if s.ndbCacheEnabled { + ndbCache = builtins.NDBCache{} + } + var buf *topdown.BufferTracer if explainMode != types.ExplainOffV1 { @@ -1608,14 +1639,14 @@ func (s *Server) v1DataPost(w http.ResponseWriter, r *http.Request) { rego, err := s.makeRego(ctx, partial, strictBuiltinErrors, txn, input, urlPath, m, includeInstrumentation, buf, opts) if err != nil { - _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, err, m) + _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, ndbCache, err, m) writer.ErrorAuto(w, err) return } pq, err := rego.PrepareForEval(ctx) if err != nil { - _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, err, m) + _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, ndbCache, err, m) writer.ErrorAuto(w, err) return } @@ -1630,6 +1661,7 @@ func (s *Server) v1DataPost(w http.ResponseWriter, r *http.Request) { rego.EvalQueryTracer(buf), rego.EvalInterQueryBuiltinCache(s.interQueryBuiltinCache), rego.EvalInstrument(includeInstrumentation), + rego.EvalNDBuiltinCache(ndbCache), } rs, err := preparedQuery.Eval( @@ -1641,7 +1673,7 @@ func (s *Server) v1DataPost(w http.ResponseWriter, r *http.Request) { // Handle results. if err != nil { - _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, err, m) + _ = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, ndbCache, err, m) writer.ErrorAuto(w, err) return } @@ -1670,7 +1702,7 @@ func (s *Server) v1DataPost(w http.ResponseWriter, r *http.Request) { return } } - err = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, nil, m) + err = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, nil, ndbCache, nil, m) if err != nil { writer.ErrorAuto(w, err) return @@ -1685,7 +1717,7 @@ func (s *Server) v1DataPost(w http.ResponseWriter, r *http.Request) { result.Explanation = s.getExplainResponse(explainMode, *buf, pretty) } - err = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, result.Result, nil, m) + err = logger.Log(ctx, txn, decisionID, r.RemoteAddr, urlPath, "", goInput, input, result.Result, ndbCache, nil, m) if err != nil { writer.ErrorAuto(w, err) return @@ -2867,7 +2899,7 @@ type decisionLogger struct { logger func(context.Context, *Info) error } -func (l decisionLogger) Log(ctx context.Context, txn storage.Transaction, decisionID, remoteAddr, path string, query string, goInput *interface{}, astInput ast.Value, goResults *interface{}, err error, m metrics.Metrics) error { +func (l decisionLogger) Log(ctx context.Context, txn storage.Transaction, decisionID, remoteAddr, path string, query string, goInput *interface{}, astInput ast.Value, goResults *interface{}, ndbCache builtins.NDBCache, err error, m metrics.Metrics) error { bundles := map[string]BundleInfo{} for name, rev := range l.revisions { @@ -2890,6 +2922,14 @@ func (l decisionLogger) Log(ctx context.Context, txn storage.Transaction, decisi Metrics: m, } + if ndbCache != nil { + x, err := ast.JSON(ndbCache.AsValue()) + if err != nil { + return err + } + info.NDBuiltinCache = &x + } + if l.logger != nil { if err := l.logger(ctx, info); err != nil { return fmt.Errorf("decision_logs: %w", err) diff --git a/topdown/builtins/builtins.go b/topdown/builtins/builtins.go index b7fa8bb27a..d7c3afbd89 100644 --- a/topdown/builtins/builtins.go +++ b/topdown/builtins/builtins.go @@ -12,6 +12,7 @@ import ( "strings" "github.com/open-policy-agent/opa/ast" + "github.com/open-policy-agent/opa/util" ) // Cache defines the built-in cache used by the top-down evaluation. The keys @@ -34,6 +35,14 @@ func (c Cache) Get(k interface{}) (interface{}, bool) { // the member keys. type NDBCache map[string]ast.Object +func (c NDBCache) AsValue() ast.Value { + out := ast.NewObject() + for bname, obj := range c { + out.Insert(ast.StringTerm(bname), ast.NewTerm(obj)) + } + return out +} + // Put updates the cache for the named built-in. // Automatically creates the 2-level hierarchy as needed. func (c NDBCache) Put(name string, k, v ast.Value) { @@ -57,31 +66,41 @@ func (c NDBCache) Get(name string, k ast.Value) (ast.Value, bool) { // Convenience functions for serializing the data structure. func (c NDBCache) MarshalJSON() ([]byte, error) { - out := make(map[string]json.RawMessage) - for bname, obj := range c { - j, err := json.Marshal(ast.NewTerm(obj)) - if err != nil { - return nil, err - } - out[bname] = j + v, err := ast.JSON(c.AsValue()) + if err != nil { + return nil, err } - return json.Marshal(out) + return json.Marshal(v) } func (c *NDBCache) UnmarshalJSON(data []byte) error { out := map[string]ast.Object{} - var incoming map[string]ast.Term + var incoming interface{} - // We deserialize into a map of Terms, and then extract out the Objects. - err := json.Unmarshal(data, &incoming) + // Note: We use util.Unmarshal instead of json.Unmarshal to get + // correct deserialization of number types. + err := util.Unmarshal(data, &incoming) if err != nil { return err } - for k, v := range incoming { - if obj, ok := v.Value.(ast.Object); ok { - out[k] = obj - } else { + + // Convert interface types back into ast.Value types. + nestedObject, err := ast.InterfaceToValue(incoming) + if err != nil { + return err + } + + // Reconstruct NDBCache from nested ast.Object structure. + if source, ok := nestedObject.(ast.Object); ok { + err = source.Iter(func(k, v *ast.Term) error { + if obj, ok := v.Value.(ast.Object); ok { + out[string(k.Value.(ast.String))] = obj + return nil + } return fmt.Errorf("expected Object, got other Value type in conversion") + }) + if err != nil { + return err } }