diff --git a/plugins/logs/plugin.go b/plugins/logs/plugin.go index 6f55ba18ba..07d5a639a0 100644 --- a/plugins/logs/plugin.go +++ b/plugins/logs/plugin.go @@ -44,20 +44,21 @@ type Logger interface { // the struct. Any changes here MUST be reflected in the AST() // implementation below. type EventV1 struct { - Labels map[string]string `json:"labels"` - DecisionID string `json:"decision_id"` - Revision string `json:"revision,omitempty"` // Deprecated: Use Bundles instead - Bundles map[string]BundleInfoV1 `json:"bundles,omitempty"` - Path string `json:"path,omitempty"` - Query string `json:"query,omitempty"` - Input *interface{} `json:"input,omitempty"` - Result *interface{} `json:"result,omitempty"` - Erased []string `json:"erased,omitempty"` - Masked []string `json:"masked,omitempty"` - Error error `json:"error,omitempty"` - RequestedBy string `json:"requested_by,omitempty"` - Timestamp time.Time `json:"timestamp"` - Metrics map[string]interface{} `json:"metrics,omitempty"` + Labels map[string]string `json:"labels"` + DecisionID string `json:"decision_id"` + Revision string `json:"revision,omitempty"` // Deprecated: Use Bundles instead + Bundles map[string]BundleInfoV1 `json:"bundles,omitempty"` + Path string `json:"path,omitempty"` + Query string `json:"query,omitempty"` + Input *interface{} `json:"input,omitempty"` + Result *interface{} `json:"result,omitempty"` + MappedResult *interface{} `json:"mapped_result,omitempty"` + Erased []string `json:"erased,omitempty"` + Masked []string `json:"masked,omitempty"` + Error error `json:"error,omitempty"` + RequestedBy string `json:"requested_by,omitempty"` + Timestamp time.Time `json:"timestamp"` + Metrics map[string]interface{} `json:"metrics,omitempty"` inputAST ast.Value } @@ -85,6 +86,7 @@ var pathKey = ast.StringTerm("path") var queryKey = ast.StringTerm("query") var inputKey = ast.StringTerm("input") var resultKey = ast.StringTerm("result") +var mappedResultKey = ast.StringTerm("mapped_result") var erasedKey = ast.StringTerm("erased") var maskedKey = ast.StringTerm("masked") var errorKey = ast.StringTerm("error") @@ -149,6 +151,14 @@ func (e *EventV1) AST() (ast.Value, error) { event.Insert(resultKey, ast.NewTerm(results)) } + if e.MappedResult != nil { + mResults, err := roundtripJSONToAST(e.MappedResult) + if err != nil { + return nil, err + } + event.Insert(mappedResultKey, ast.NewTerm(mResults)) + } + if len(e.Erased) > 0 { erased := make([]*ast.Term, len(e.Erased)) for i, v := range e.Erased { @@ -547,17 +557,18 @@ func (p *Plugin) Log(ctx context.Context, decision *server.Info) error { } event := EventV1{ - Labels: p.manager.Labels(), - DecisionID: decision.DecisionID, - Revision: decision.Revision, - Bundles: bundles, - Path: decision.Path, - Query: decision.Query, - Input: decision.Input, - Result: decision.Results, - RequestedBy: decision.RemoteAddr, - Timestamp: decision.Timestamp, - inputAST: decision.InputAST, + Labels: p.manager.Labels(), + DecisionID: decision.DecisionID, + Revision: decision.Revision, + Bundles: bundles, + Path: decision.Path, + Query: decision.Query, + Input: decision.Input, + Result: decision.Results, + MappedResult: decision.MappedResults, + RequestedBy: decision.RemoteAddr, + Timestamp: decision.Timestamp, + inputAST: decision.InputAST, } if decision.Metrics != nil { diff --git a/sdk/RawMapper.go b/sdk/RawMapper.go new file mode 100644 index 0000000000..14221e41c9 --- /dev/null +++ b/sdk/RawMapper.go @@ -0,0 +1,17 @@ +package sdk + +import ( + "github.com/open-policy-agent/opa/rego" +) + +type RawMapper struct { +} + +func (e *RawMapper) MapResults(pq *rego.PartialQueries) (interface{}, error) { + + return pq, nil +} + +func (e *RawMapper) ResultToJSON(results interface{}) (interface{}, error) { + return results, nil +} diff --git a/sdk/opa.go b/sdk/opa.go index 522dee7320..0dc4b89a82 100644 --- a/sdk/opa.go +++ b/sdk/opa.go @@ -215,6 +215,64 @@ func (opa *OPA) Stop(ctx context.Context) { // Decision returns a named decision. This function is threadsafe. func (opa *OPA) Decision(ctx context.Context, options DecisionOptions) (*DecisionResult, error) { + record := server.Info{ + Timestamp: options.Now, + Path: options.Path, + Input: &options.Input, + } + + result, err := opa.executeTransaction( + ctx, + &record, + func(result *DecisionResult) { + result.Result, record.InputAST, record.Bundles, record.Error = evaluate(ctx, evalArgs{ + runtime: opa.state.manager.Info, + printHook: opa.state.manager.PrintHook(), + compiler: opa.state.manager.GetCompiler(), + store: opa.state.manager.Store, + txn: record.Txn, + queryCache: opa.state.queryCache, + interQueryCache: opa.state.interQueryBuiltinCache, + now: record.Timestamp, + path: record.Path, + input: *record.Input, + m: record.Metrics, + }) + if record.Error == nil { + record.Results = &result.Result + } + }, + ) + if err != nil { + return nil, err + } + + return result, record.Error +} + +// DecisionOptions contains parameters for query evaluation. +type DecisionOptions struct { + Now time.Time // specifies wallclock time used for time.now_ns(), decision log timestamp, etc. + Path string // specifies name of policy decision to evaluate (e.g., example/allow) + Input interface{} // specifies value of the input document to evaluate policy with +} + +// DecisionResult contains the output of query evaluation. +type DecisionResult struct { + ID string // provides a globally unique identifier for this decision (which is included in the decision log.) + Result interface{} // provides the output of query evaluation. +} + +func newDecisionResult() (*DecisionResult, error) { + id, err := uuid.New(rand.Reader) + if err != nil { + return nil, err + } + result := &DecisionResult{ID: id} + return result, nil +} + +func (opa *OPA) executeTransaction(ctx context.Context, record *server.Info, work func(result *DecisionResult)) (*DecisionResult, error) { m := metrics.New() m.Timer(metrics.SDKDecisionEval).Start() @@ -227,13 +285,8 @@ func (opa *OPA) Decision(ctx context.Context, options DecisionOptions) (*Decisio s := *opa.state opa.mtx.Unlock() - record := server.Info{ - DecisionID: result.ID, - Timestamp: options.Now, - Path: options.Path, - Input: &options.Input, - Metrics: m, - } + record.DecisionID = result.ID + record.Metrics = m if record.Timestamp.IsZero() { record.Timestamp = time.Now().UTC() @@ -247,55 +300,93 @@ func (opa *OPA) Decision(ctx context.Context, options DecisionOptions) (*Decisio if record.Error == nil { defer s.manager.Store.Abort(ctx, record.Txn) - result.Result, record.InputAST, record.Bundles, record.Error = evaluate(ctx, evalArgs{ - runtime: s.manager.Info, - printHook: s.manager.PrintHook(), - compiler: s.manager.GetCompiler(), - store: s.manager.Store, - txn: record.Txn, - queryCache: s.queryCache, - interQueryCache: s.interQueryBuiltinCache, - now: record.Timestamp, - path: record.Path, - input: *record.Input, - m: record.Metrics, - }) - if record.Error == nil { - record.Results = &result.Result - } + work(result) } m.Timer(metrics.SDKDecisionEval).Stop() if logger := logs.Lookup(s.manager); logger != nil { - if err := logger.Log(ctx, &record); err != nil { + if err := logger.Log(ctx, record); err != nil { return result, fmt.Errorf("decision log: %w", err) } } - - return result, record.Error + return result, nil } -// DecisionOptions contains parameters for query evaluation. -type DecisionOptions struct { - Now time.Time // specifies wallclock time used for time.now_ns(), decision log timestamp, etc. - Path string // specifies name of policy decision to evaluate (e.g., example/allow) - Input interface{} // specifies value of the input document to evaluate policy with -} +// Partial returns a named decision. This function is threadsafe. +func (opa *OPA) Partial(ctx context.Context, options PartialOptions) (*PartialResult, error) { -// DecisionResult contains the output of query evaluation. -type DecisionResult struct { - ID string // provides a globally unique identifier for this decision (which is included in the decision log.) - Result interface{} // provides the output of query evaluation. -} + if options.Mapper == nil { + options.Mapper = &RawMapper{} + } -func newDecisionResult() (*DecisionResult, error) { - id, err := uuid.New(rand.Reader) + record := server.Info{ + Timestamp: options.Now, + Input: &options.Input, + Query: options.Query, + } + + var pq *rego.PartialQueries + decision, err := opa.executeTransaction( + ctx, + &record, + func(result *DecisionResult) { + pq, record.InputAST, record.Bundles, record.Error = partial(ctx, partialEvalArgs{ + runtime: opa.state.manager.Info, + printHook: opa.state.manager.PrintHook(), + compiler: opa.state.manager.GetCompiler(), + store: opa.state.manager.Store, + txn: record.Txn, + now: record.Timestamp, + query: record.Query, + unknowns: options.Unknowns, + input: *record.Input, + m: record.Metrics, + }) + if record.Error == nil { + result.Result, record.Error = options.Mapper.MapResults(pq) + var pqAst interface{} + if record.Error == nil { + var mappedResults interface{} + mappedResults, record.Error = options.Mapper.ResultToJSON(result.Result) + record.MappedResults = &mappedResults + pqAst = pq + record.Results = &pqAst + } + } + }, + ) if err != nil { return nil, err } - result := &DecisionResult{ID: id} - return result, nil + + return &PartialResult{ + ID: decision.ID, + Result: decision.Result, + AST: pq, + }, record.Error +} + +type PartialQueryMapper interface { + // The first interface being returned is the type that will be used for further processing + MapResults(pq *rego.PartialQueries) (interface{}, error) + // This should be able to take the Result object from MapResults and return a type that can be logged as JSON + ResultToJSON(result interface{}) (interface{}, error) +} + +// PartialOptions contains parameters for partial query evaluation. +type PartialOptions struct { + Now time.Time // specifies wallclock time used for time.now_ns(), decision log timestamp, etc. + Input interface{} // specifies value of the input document to evaluate policy with + Query string // specifies the query to be partially evaluated + Unknowns []string // specifies the unknown elements of the policy + Mapper PartialQueryMapper // specifies the mapper to use when processing results +} + +type PartialResult struct { + ID string // decision ID + Result interface{} // mapped result + AST *rego.PartialQueries // raw result } // Error represents an internal error in the SDK. @@ -393,6 +484,50 @@ func evaluate(ctx context.Context, args evalArgs) (interface{}, ast.Value, map[s return rs[0].Expressions[0].Value, inputAST, bundles, nil } +type partialEvalArgs struct { + runtime *ast.Term + compiler *ast.Compiler + printHook print.Hook + store storage.Store + txn storage.Transaction + unknowns []string + query string + now time.Time + input interface{} + m metrics.Metrics +} + +func partial(ctx context.Context, args partialEvalArgs) (*rego.PartialQueries, ast.Value, map[string]server.BundleInfo, error) { + + bundles, err := bundles(ctx, args.store, args.txn) + if err != nil { + return nil, nil, nil, err + } + + inputAST, err := ast.InterfaceToValue(args.input) + if err != nil { + return nil, nil, bundles, err + } + re := rego.New( + rego.Time(args.now), + rego.Metrics(args.m), + rego.Store(args.store), + rego.Compiler(args.compiler), + rego.Transaction(args.txn), + rego.Runtime(args.runtime), + rego.Input(args.input), + rego.Query(args.query), + rego.Unknowns(args.unknowns), + rego.PrintHook(args.printHook), + ) + + pq, err := re.Partial(ctx) + if err != nil { + return nil, nil, bundles, err + } + return pq, inputAST, bundles, err +} + type queryCache struct { sync.Mutex cache map[string]*rego.PreparedEvalQuery diff --git a/sdk/opa_test.go b/sdk/opa_test.go index 96ae42d406..acee03a8db 100644 --- a/sdk/opa_test.go +++ b/sdk/opa_test.go @@ -15,6 +15,7 @@ import ( "time" "github.com/open-policy-agent/opa/logging" + "github.com/open-policy-agent/opa/rego" "github.com/fortytw2/leaktest" loggingtest "github.com/open-policy-agent/opa/logging/test" @@ -159,6 +160,84 @@ func TestDecision(t *testing.T) { } } +func TestPartial(t *testing.T) { + + ctx := context.Background() + + server := sdktest.MustNewServer( + sdktest.MockBundle("/bundles/bundle.tar.gz", map[string]string{ + "main.rego": ` + package test + allow { + data.junk.x = input.y + } + `, + }), + ) + + defer server.Stop() + + config := fmt.Sprintf(`{ + "services": { + "test": { + "url": %q + } + }, + "bundles": { + "test": { + "resource": "/bundles/bundle.tar.gz" + } + }, + "decision_logs": { + "console": true + } + }`, server.URL()) + + testLogger := loggingtest.New() + opa, err := sdk.New(ctx, sdk.Options{ + Config: strings.NewReader(config), + ConsoleLogger: testLogger, + }) + if err != nil { + t.Fatal(err) + } + + defer opa.Stop(ctx) + + var result *sdk.PartialResult + if result, err = opa.Partial(ctx, sdk.PartialOptions{ + Input: map[string]int{"y": 2}, + Query: "data.test.allow = true", + Unknowns: []string{"data.junk.x"}, + Mapper: &sdk.RawMapper{}, + Now: time.Unix(0, 1619868194450288000).UTC(), + }); err != nil { + t.Fatal(err) + } else if decision, ok := result.Result.(*rego.PartialQueries); !ok || decision.Queries[0].String() != "2 = data.junk.x" { + t.Fatal("expected &{[2 = data.junk.x] []} true but got:", decision, ok) + } + + entries := testLogger.Entries() + + if l := len(entries); l != 1 { + t.Fatalf("expected %v but got %v", 1, l) + } + + // just checking for existence, since it's a complex value + if entries[0].Fields["mapped_result"] == nil { + t.Fatalf("expected not nil value for mapped_result but got nil") + } + + if entries[0].Fields["result"] == nil { + t.Fatalf("expected not nil value for result but got nil") + } + + if entries[0].Fields["timestamp"] != "2021-05-01T11:23:14.450288Z" { + t.Fatalf("expected %v but got %v", "2021-05-01T11:23:14.450288Z", entries[0].Fields["timestamp"]) + } + +} + func TestUndefinedError(t *testing.T) { ctx := context.Background() diff --git a/server/buffer.go b/server/buffer.go index 363450c79e..2cb83e1d97 100644 --- a/server/buffer.go +++ b/server/buffer.go @@ -15,20 +15,21 @@ import ( // Info contains information describing a policy decision. type Info struct { - Txn storage.Transaction - Revision string // Deprecated: Use `Bundles` instead - Bundles map[string]BundleInfo - DecisionID string - RemoteAddr string - Query string - Path string - Timestamp time.Time - Input *interface{} - InputAST ast.Value - Results *interface{} - Error error - Metrics metrics.Metrics - Trace []*topdown.Event + Txn storage.Transaction + Revision string // Deprecated: Use `Bundles` instead + Bundles map[string]BundleInfo + DecisionID string + RemoteAddr string + Query string + Path string + Timestamp time.Time + Input *interface{} + InputAST ast.Value + Results *interface{} + MappedResults *interface{} + Error error + Metrics metrics.Metrics + Trace []*topdown.Event } // BundleInfo contains information describing a bundle.