diff --git a/sdk/opa.go b/sdk/opa.go index 545478d88a..6b47f77de3 100644 --- a/sdk/opa.go +++ b/sdk/opa.go @@ -227,6 +227,7 @@ func (opa *OPA) Decision(ctx context.Context, options DecisionOptions) (*Decisio Path: options.Path, Input: &options.Input, NDBuiltinCache: &options.NDBCache, + Metrics: options.Metrics, } // Only use non-deterministic builtins cache if it's available. @@ -256,6 +257,8 @@ func (opa *OPA) Decision(ctx context.Context, options DecisionOptions) (*Decisio m: record.Metrics, strictBuiltinErrors: options.StrictBuiltinErrors, tracer: options.Tracer, + profiler: options.Profiler, + instrument: options.Instrument, }) if record.Error == nil { record.Results = &result.Result @@ -277,6 +280,9 @@ type DecisionOptions struct { NDBCache interface{} // specifies the non-deterministic builtins cache to use for evaluation. StrictBuiltinErrors bool // treat built-in function errors as fatal Tracer topdown.QueryTracer // specifies the tracer to use for evaluation, optional + Metrics metrics.Metrics // specifies the metrics to use for preparing and evaluation, optional + Profiler topdown.QueryTracer // specifies the profiler to use, optional + Instrument bool // if true, instrumentation will be enabled } // DecisionResult contains the output of query evaluation. @@ -295,8 +301,10 @@ func newDecisionResult() (*DecisionResult, error) { } func (opa *OPA) executeTransaction(ctx context.Context, record *server.Info, work func(state, *DecisionResult)) (*DecisionResult, error) { - m := metrics.New() - m.Timer(metrics.SDKDecisionEval).Start() + if record.Metrics == nil { + record.Metrics = metrics.New() + } + record.Metrics.Timer(metrics.SDKDecisionEval).Start() result, err := newDecisionResult() if err != nil { @@ -308,7 +316,6 @@ func (opa *OPA) executeTransaction(ctx context.Context, record *server.Info, wor opa.mtx.Unlock() record.DecisionID = result.ID - record.Metrics = m if record.Timestamp.IsZero() { record.Timestamp = time.Now().UTC() @@ -325,7 +332,7 @@ func (opa *OPA) executeTransaction(ctx context.Context, record *server.Info, wor work(s, result) } - m.Timer(metrics.SDKDecisionEval).Stop() + record.Metrics.Timer(metrics.SDKDecisionEval).Stop() if logger := logs.Lookup(s.manager); logger != nil { if err := logger.Log(ctx, record); err != nil { @@ -348,6 +355,7 @@ func (opa *OPA) Partial(ctx context.Context, options PartialOptions) (*PartialRe Timestamp: options.Now, Input: &options.Input, Query: options.Query, + Metrics: options.Metrics, } var pq *rego.PartialQueries @@ -368,6 +376,8 @@ func (opa *OPA) Partial(ctx context.Context, options PartialOptions) (*PartialRe m: record.Metrics, strictBuiltinErrors: options.StrictBuiltinErrors, tracer: options.Tracer, + profiler: options.Profiler, + instrument: options.Instrument, }) if record.Error == nil { result.Result, record.Error = options.Mapper.MapResults(pq) @@ -409,6 +419,9 @@ type PartialOptions struct { Mapper PartialQueryMapper // specifies the mapper to use when processing results StrictBuiltinErrors bool // treat built-in function errors as fatal Tracer topdown.QueryTracer // specifies the tracer to use for evaluation, optional + Metrics metrics.Metrics // specifies the metrics to use for preparing and evaluation, optional + Profiler topdown.QueryTracer // specifies the profiler to use, optional + Instrument bool // if true, instrumentation will be enabled } type PartialResult struct { @@ -460,6 +473,8 @@ type evalArgs struct { m metrics.Metrics strictBuiltinErrors bool tracer topdown.QueryTracer + profiler topdown.QueryTracer + instrument bool } func evaluate(ctx context.Context, args evalArgs) (interface{}, ast.Value, map[string]server.BundleInfo, error) { @@ -484,6 +499,7 @@ func evaluate(ctx context.Context, args evalArgs) (interface{}, ast.Value, map[s rego.Transaction(args.txn), rego.PrintHook(args.printHook), rego.StrictBuiltinErrors(args.strictBuiltinErrors), + rego.Instrument(args.instrument), rego.Runtime(args.runtime)).PrepareForEval(ctx) if err != nil { return nil, err @@ -508,6 +524,9 @@ func evaluate(ctx context.Context, args evalArgs) (interface{}, ast.Value, map[s rego.EvalInterQueryBuiltinCache(args.interQueryCache), rego.EvalNDBuiltinCache(args.ndbcache), rego.EvalQueryTracer(args.tracer), + rego.EvalMetrics(args.m), + rego.EvalQueryTracer(args.profiler), + rego.EvalInstrument(args.instrument), ) if err != nil { return nil, inputAST, bundles, err @@ -531,6 +550,8 @@ type partialEvalArgs struct { m metrics.Metrics strictBuiltinErrors bool tracer topdown.QueryTracer + profiler topdown.QueryTracer + instrument bool } func partial(ctx context.Context, args partialEvalArgs) (*rego.PartialQueries, ast.Value, map[string]server.BundleInfo, error) { @@ -557,6 +578,8 @@ func partial(ctx context.Context, args partialEvalArgs) (*rego.PartialQueries, a rego.PrintHook(args.printHook), rego.StrictBuiltinErrors(args.strictBuiltinErrors), rego.QueryTracer(args.tracer), + rego.QueryTracer(args.profiler), + rego.Instrument(args.instrument), ) pq, err := re.Partial(ctx) diff --git a/sdk/opa_test.go b/sdk/opa_test.go index 4106d57fbb..98c49ef710 100644 --- a/sdk/opa_test.go +++ b/sdk/opa_test.go @@ -16,6 +16,8 @@ import ( "github.com/open-policy-agent/opa/ast" "github.com/open-policy-agent/opa/logging" + "github.com/open-policy-agent/opa/metrics" + "github.com/open-policy-agent/opa/profiler" "github.com/open-policy-agent/opa/rego" "github.com/open-policy-agent/opa/topdown" "github.com/open-policy-agent/opa/topdown/builtins" @@ -357,6 +359,171 @@ main { } } +func TestDecisionWithMetrics(t *testing.T) { + + ctx := context.Background() + + server := sdktest.MustNewServer( + sdktest.MockBundle("/bundles/bundle.tar.gz", map[string]string{ + "main.rego": ` +package system + +main = true +`, + }), + ) + + defer server.Stop() + + config := fmt.Sprintf(`{ + "services": { + "test": { + "url": %q + } + }, + "bundles": { + "test": { + "resource": "/bundles/bundle.tar.gz" + } + } + }`, server.URL()) + + opa, err := sdk.New(ctx, sdk.Options{ + Config: strings.NewReader(config), + }) + if err != nil { + t.Fatal(err) + } + + defer opa.Stop(ctx) + + m := metrics.New() + if result, err := opa.Decision(ctx, sdk.DecisionOptions{ + Metrics: m, + }); err != nil { + t.Fatal(err) + } else if decision, ok := result.Result.(bool); !ok || !decision { + t.Fatal("expected true but got:", decision, ok) + } + + if exp, act := 4, len(m.All()); exp != act { + t.Fatalf("expected %d metrics, got %d", exp, act) + } + + expectedRecordedMetricGroups := map[string]bool{ + "timer_rego": false, + "timer_sdk": false, + } + for k := range m.All() { + for group, found := range expectedRecordedMetricGroups { + if found { + continue + } + if strings.HasPrefix(k, group) { + expectedRecordedMetricGroups[group] = true + } + } + } + for group, found := range expectedRecordedMetricGroups { + if !found { + t.Errorf("expected metric group %s not recorded", group) + } + } + +} + +func TestDecisionWithIntrumentationAndProfile(t *testing.T) { + + ctx := context.Background() + + server := sdktest.MustNewServer( + sdktest.MockBundle("/bundles/bundle.tar.gz", map[string]string{ + "main.rego": ` +package system + +main = true +`, + }), + ) + + defer server.Stop() + + config := fmt.Sprintf(`{ + "services": { + "test": { + "url": %q + } + }, + "bundles": { + "test": { + "resource": "/bundles/bundle.tar.gz" + } + } + }`, server.URL()) + + opa, err := sdk.New(ctx, sdk.Options{ + Config: strings.NewReader(config), + }) + if err != nil { + t.Fatal(err) + } + + defer opa.Stop(ctx) + + m := metrics.New() + p := profiler.New() + if result, err := opa.Decision(ctx, sdk.DecisionOptions{ + Metrics: m, + Profiler: p, + Instrument: true, + }); err != nil { + t.Fatal(err) + } else if decision, ok := result.Result.(bool); !ok || !decision { + t.Fatal("expected true but got:", decision, ok) + } + + if exp, act := 25, len(m.All()); exp != act { + t.Fatalf("expected %d metrics, got %d", exp, act) + } + + expectedRecordedMetricGroups := map[string]bool{ + "counter_eval": false, + "histogram_eval": false, + "timer_query_compile": false, + "timer_eval": false, + "timer_rego": false, + "timer_sdk": false, + } + for k := range m.All() { + for group, found := range expectedRecordedMetricGroups { + if found { + continue + } + if strings.HasPrefix(k, group) { + expectedRecordedMetricGroups[group] = true + } + } + } + for group, found := range expectedRecordedMetricGroups { + if !found { + t.Errorf("expected metric group %s not recorded", group) + } + } + + stats := p.ReportTopNResults(10, []string{"line"}) + + if exp, act := 2, len(stats); exp != act { + t.Fatalf("expected %d stats, got %d", exp, act) + } + if exp, act := "true", string(stats[0].Location.Text); exp != act { + t.Errorf("expected location %q got %q", exp, act) + } + if exp, act := "data.system.main", string(stats[1].Location.Text); exp != act { + t.Errorf("expected location %q got %q", exp, act) + } + +} + func TestPartial(t *testing.T) { ctx := context.Background() @@ -588,6 +755,195 @@ main { } } +func TestPartialWithMetrics(t *testing.T) { + + ctx := context.Background() + + server := sdktest.MustNewServer( + sdktest.MockBundle("/bundles/bundle.tar.gz", map[string]string{ + "main.rego": ` +package test + +allow { + data.junk.x = input.y +} +`, + }), + ) + + defer server.Stop() + + config := fmt.Sprintf(`{ + "services": { + "test": { + "url": %q + } + }, + "bundles": { + "test": { + "resource": "/bundles/bundle.tar.gz" + } + }, + "decision_logs": { + "console": true + } + }`, server.URL()) + + opa, err := sdk.New(ctx, sdk.Options{ + Config: strings.NewReader(config), + }) + if err != nil { + t.Fatal(err) + } + + defer opa.Stop(ctx) + + m := metrics.New() + var result *sdk.PartialResult + if result, err = opa.Partial(ctx, sdk.PartialOptions{ + Input: map[string]int{"y": 2}, + Query: "data.test.allow = true", + Unknowns: []string{"data.junk.x"}, + Mapper: &sdk.RawMapper{}, + Now: time.Unix(0, 1619868194450288000).UTC(), + Metrics: m, + }); err != nil { + t.Fatal(err) + } else if decision, ok := result.Result.(*rego.PartialQueries); !ok || decision.Queries[0].String() != "2 = data.junk.x" { + t.Fatal("expected &{[2 = data.junk.x] []} true but got:", decision, ok) + } + + if exp, act := 5, len(m.All()); exp != act { + t.Fatalf("expected %d metrics, got %d", exp, act) + } + + expectedRecordedMetricGroups := map[string]bool{ + "timer_rego": false, + "timer_sdk": false, + } + for k := range m.All() { + for group, found := range expectedRecordedMetricGroups { + if found { + continue + } + if strings.HasPrefix(k, group) { + expectedRecordedMetricGroups[group] = true + } + } + } + for group, found := range expectedRecordedMetricGroups { + if !found { + t.Errorf("expected metric group %s not recorded", group) + } + } + +} + +func TestPartialWithInstrumentationAndProfile(t *testing.T) { + + ctx := context.Background() + + server := sdktest.MustNewServer( + sdktest.MockBundle("/bundles/bundle.tar.gz", map[string]string{ + "main.rego": ` +package test + +allow { + data.junk.x = input.y +} +`, + }), + ) + + defer server.Stop() + + config := fmt.Sprintf(`{ + "services": { + "test": { + "url": %q + } + }, + "bundles": { + "test": { + "resource": "/bundles/bundle.tar.gz" + } + }, + "decision_logs": { + "console": true + } + }`, server.URL()) + + opa, err := sdk.New(ctx, sdk.Options{ + Config: strings.NewReader(config), + }) + if err != nil { + t.Fatal(err) + } + + defer opa.Stop(ctx) + + m := metrics.New() + p := profiler.New() + var result *sdk.PartialResult + if result, err = opa.Partial(ctx, sdk.PartialOptions{ + Input: map[string]int{"y": 2}, + Query: "data.test.allow = true", + Unknowns: []string{"data.junk.x"}, + Mapper: &sdk.RawMapper{}, + Now: time.Unix(0, 1619868194450288000).UTC(), + Metrics: m, + Profiler: p, + Instrument: true, + }); err != nil { + t.Fatal(err) + } else if decision, ok := result.Result.(*rego.PartialQueries); !ok || decision.Queries[0].String() != "2 = data.junk.x" { + t.Fatal("expected &{[2 = data.junk.x] []} true but got:", decision, ok) + } + + if exp, act := 32, len(m.All()); exp != act { + t.Fatalf("expected %d metrics, got %d", exp, act) + } + + expectedRecordedMetricGroups := map[string]bool{ + "histogram_eval": false, + "histogram_partial": false, + "timer_query_compile": false, + "timer_eval": false, + "timer_partial": false, + "timer_rego": false, + "timer_sdk": false, + } + + for k := range m.All() { + for group, found := range expectedRecordedMetricGroups { + if found { + continue + } + if strings.HasPrefix(k, group) { + expectedRecordedMetricGroups[group] = true + } + } + } + for group, found := range expectedRecordedMetricGroups { + if !found { + t.Errorf("expected metric group %s not recorded", group) + } + } + + stats := p.ReportTopNResults(10, []string{"line"}) + + if exp, act := 2, len(stats); exp != act { + t.Fatalf("expected %d stats, got %d", exp, act) + } + if exp, act := "data.junk.x = input.y", string(stats[0].Location.Text); exp != act { + t.Errorf("expected location %q got %q", exp, act) + } + if exp, act := "data.test.allow = true", string(stats[1].Location.Text); exp != act { + t.Errorf("expected location %q got %q", exp, act) + } + +} + func TestUndefinedError(t *testing.T) { ctx := context.Background() @@ -805,10 +1161,8 @@ main = 7 } }`, server.URL()) - testLogger := loggingtest.New() opa, err := sdk.New(ctx, sdk.Options{ - Config: strings.NewReader(config), - ConsoleLogger: testLogger, + Config: strings.NewReader(config), }) if err != nil { t.Fatal(err) @@ -816,27 +1170,27 @@ main = 7 defer opa.Stop(ctx) - // Execute two queries. - if _, err := opa.Decision(ctx, sdk.DecisionOptions{}); err != nil { + // Execute two queries with metrics + m1 := metrics.New() + if _, err := opa.Decision(ctx, sdk.DecisionOptions{ + Metrics: m1, + }); err != nil { t.Fatal(err) } - if _, err := opa.Decision(ctx, sdk.DecisionOptions{}); err != nil { + m2 := metrics.New() + if _, err := opa.Decision(ctx, sdk.DecisionOptions{ + Metrics: m2, + }); err != nil { t.Fatal(err) } - // Expect two log entries, one with timers for query preparation and the other without. - entries := testLogger.Entries() - - if len(entries) != 2 { - t.Fatal("expected two log entries but got:", entries) + // Expect only the metrics from the first query to contain preparation metrics + if _, ok := m1.All()["timer_rego_query_parse_ns"]; !ok { + t.Fatal("first query should have preparation metrics") } - - _, ok1 := entries[0].Fields["metrics"].(map[string]interface{})["timer_rego_query_parse_ns"] - _, ok2 := entries[1].Fields["metrics"].(map[string]interface{})["timer_rego_query_parse_ns"] - - if !ok1 || ok2 { - t.Fatal("expected first query to require preparation but not the second") + if _, ok := m2.All()["timer_rego_query_parse_ns"]; ok { + t.Fatal("second query should not have preparation metrics") } }