Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

server+plugins: Integrate server + NDBCache in decision logs. #5147

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/config.go
Expand Up @@ -32,6 +32,7 @@ type Config struct {
DefaultDecision *string `json:"default_decision,omitempty"`
DefaultAuthorizationDecision *string `json:"default_authorization_decision,omitempty"`
Caching json.RawMessage `json:"caching,omitempty"`
NDBuiltinCache bool `json:"nd_builtin_cache,omitempty"`
PersistenceDirectory *string `json:"persistence_directory,omitempty"`
DistributedTracing json.RawMessage `json:"distributed_tracing,omitempty"`
Storage *struct {
Expand Down Expand Up @@ -87,6 +88,11 @@ func (c Config) DefaultAuthorizationDecisionRef() ast.Ref {
return r
}

// NDBuiltinCacheEnabled returns if the ND builtins cache should be used.
func (c Config) NDBuiltinCacheEnabled() bool {
return c.NDBuiltinCache
}

func (c *Config) validateAndInjectDefaults(id string) error {

if c.DefaultDecision == nil {
Expand Down
1 change: 1 addition & 0 deletions docs/content/configuration.md
Expand Up @@ -740,6 +740,7 @@ For *GHCR* (Github Container Registry) you can use a developer PAT (personal acc
| `default_authorization_decision` | `string` | No (default: `/system/authz/allow`) | Set path of default authorization decision for OPA's API. |
| `persistence_directory` | `string` | No (default `$PWD/.opa`) | Set directory to use for persistence with options like `bundles[_].persist`. |
| `plugins` | `object` | No (default: `{}`) | Location for custom plugin configuration. See [Plugins](../plugins) for details. |
| `nd_builtin_cache` | `boolean` | No (default: `false`) | Enable the non-deterministic builtins caching system during policy evaluation, and include the contents of the cache in decision logs. Note that decision logs that are larger than `upload_size_limit_bytes` will drop the `nd_builtin_cache` key from the log entry before uploading. |

### Keys

Expand Down
9 changes: 8 additions & 1 deletion docs/content/management-decision-logs.md
Expand Up @@ -75,6 +75,7 @@ Decision log updates contain the following fields:
| `[_].metrics` | `object` | Key-value pairs of [performance metrics](../rest-api#performance-metrics). |
| `[_].erased` | `array[string]` | Set of JSON Pointers specifying fields in the event that were erased. |
| `[_].masked` | `array[string]` | Set of JSON Pointers specifying fields in the event that were masked. |
| `[_].nd_builtin_cache` | `object` | Key-value pairs of non-deterministic builtin names, paired with objects specifying the input/output mappings for each unique invocation of that builtin during policy evaluation. Intended for use in debugging and decision replay. Receivers will need to decode the JSON using Rego's JSON decoders. |

If the decision log was successfully uploaded to the remote service, it should respond with an HTTP 2xx status. If the
service responds with a non-2xx status, OPA will requeue the last chunk containing decision log events and upload it
Expand All @@ -95,6 +96,12 @@ the last chunk.

`Equilibrium`: If the chunk size is between 90% and 100% of the user-configured limit, maintain soft limit value.

When an event containing `nd_builtin_cache` cannot fit into a chunk smaller than `upload_size_limit_bytes`, OPA will
drop the `nd_builtin_cache` key from the event, and will retry encoding the chunk without the non-deterministic
builtins cache information. This best-effort approach ensures that OPA reports decision log events as much as possible,
and bounds how large decision log events can get. This size-bounding is necessary, because some non-deterministic builtins
(such as `http.send`) can increase the decision log event size by a potentially unbounded amount.

### Local Decision Logs

Local console logging of decisions can be enabled via the `console` config option.
Expand Down Expand Up @@ -172,7 +179,7 @@ from the decision log event. The erased paths are recorded on the event itself:

There are a few restrictions on the JSON Pointers that OPA will erase:

* Pointers must be prefixed with `/input` or `/result`.
* Pointers must be prefixed with `/input`, `/result`, or `/nd_builtin_cache`.
* Pointers may be undefined. For example `/input/name/first` in the example
above would be undefined. Undefined pointers are ignored.
* Pointers must refer to object keys. Pointers to array elements will be treated
Expand Down
23 changes: 17 additions & 6 deletions plugins/logs/mask.go
Expand Up @@ -21,8 +21,9 @@ const (
maskOPRemove maskOP = "remove"
maskOPUpsert maskOP = "upsert"

partInput = "input"
partResult = "result"
partInput = "input"
partResult = "result"
partNDBCache = "nd_builtin_cache"
)

var (
Expand Down Expand Up @@ -64,7 +65,9 @@ func newMaskRule(path string, opts ...maskRuleOption) (*maskRule, error) {

parts := strings.Split(path[1:], "/")

if parts[0] != partInput && parts[0] != partResult {
switch parts[0] {
case partInput, partResult, partNDBCache: // OK
default:
return nil, fmt.Errorf("mask prefix not allowed: %v", parts[0])
}

Expand Down Expand Up @@ -130,8 +133,8 @@ func withFailUndefinedPath() maskRuleOption {

func (r maskRule) Mask(event *EventV1) error {

var maskObj *interface{} // pointer to event Input|Result object
var maskObjPtr **interface{} // pointer to the event Input|Result pointer itself
var maskObj *interface{} // pointer to event Input|Result|NDBCache object
var maskObjPtr **interface{} // pointer to the event Input|Result|NDBCache pointer itself

switch p := r.escapedParts[0]; p {
case partInput:
Expand All @@ -143,7 +146,6 @@ func (r maskRule) Mask(event *EventV1) error {
}
maskObj = event.Input
maskObjPtr = &event.Input

case partResult:
if event.Result == nil {
if r.failUndefinedPath {
Expand All @@ -153,6 +155,15 @@ func (r maskRule) Mask(event *EventV1) error {
}
maskObj = event.Result
maskObjPtr = &event.Result
case partNDBCache:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the careful interface/type-wrangling we do earlier in the server's Log function, we can just copy/paste the masking logic here for the NDBCache. It behaves exactly like the Input and Result masking targets, so the logic is safe to reuse with minimal changes. 馃槃

if event.NDBuiltinCache == nil {
if r.failUndefinedPath {
return errMaskInvalidObject
}
return nil
}
maskObj = event.NDBuiltinCache
maskObjPtr = &event.NDBuiltinCache
default:
return fmt.Errorf("illegal path value: %s", p)
}
Expand Down
98 changes: 66 additions & 32 deletions plugins/logs/plugin.go
Expand Up @@ -44,21 +44,22 @@ type Logger interface {
// the struct. Any changes here MUST be reflected in the AST()
// implementation below.
type EventV1 struct {
Labels map[string]string `json:"labels"`
DecisionID string `json:"decision_id"`
Revision string `json:"revision,omitempty"` // Deprecated: Use Bundles instead
Bundles map[string]BundleInfoV1 `json:"bundles,omitempty"`
Path string `json:"path,omitempty"`
Query string `json:"query,omitempty"`
Input *interface{} `json:"input,omitempty"`
Result *interface{} `json:"result,omitempty"`
MappedResult *interface{} `json:"mapped_result,omitempty"`
Erased []string `json:"erased,omitempty"`
Masked []string `json:"masked,omitempty"`
Error error `json:"error,omitempty"`
RequestedBy string `json:"requested_by,omitempty"`
Timestamp time.Time `json:"timestamp"`
Metrics map[string]interface{} `json:"metrics,omitempty"`
Labels map[string]string `json:"labels"`
DecisionID string `json:"decision_id"`
Revision string `json:"revision,omitempty"` // Deprecated: Use Bundles instead
Bundles map[string]BundleInfoV1 `json:"bundles,omitempty"`
Path string `json:"path,omitempty"`
Query string `json:"query,omitempty"`
Input *interface{} `json:"input,omitempty"`
Result *interface{} `json:"result,omitempty"`
MappedResult *interface{} `json:"mapped_result,omitempty"`
NDBuiltinCache *interface{} `json:"nd_builtin_cache,omitempty"`
Erased []string `json:"erased,omitempty"`
Masked []string `json:"masked,omitempty"`
Error error `json:"error,omitempty"`
RequestedBy string `json:"requested_by,omitempty"`
Timestamp time.Time `json:"timestamp"`
Metrics map[string]interface{} `json:"metrics,omitempty"`

inputAST ast.Value
}
Expand Down Expand Up @@ -87,6 +88,7 @@ var queryKey = ast.StringTerm("query")
var inputKey = ast.StringTerm("input")
var resultKey = ast.StringTerm("result")
var mappedResultKey = ast.StringTerm("mapped_result")
var ndBuiltinCacheKey = ast.StringTerm("nd_builtin_cache")
var erasedKey = ast.StringTerm("erased")
var maskedKey = ast.StringTerm("masked")
var errorKey = ast.StringTerm("error")
Expand Down Expand Up @@ -159,6 +161,14 @@ func (e *EventV1) AST() (ast.Value, error) {
event.Insert(mappedResultKey, ast.NewTerm(mResults))
}

if e.NDBuiltinCache != nil {
ndbCache, err := roundtripJSONToAST(e.NDBuiltinCache)
if err != nil {
return nil, err
}
event.Insert(ndBuiltinCacheKey, ast.NewTerm(ndbCache))
}

if len(e.Erased) > 0 {
erased := make([]*ast.Term, len(e.Erased))
for i, v := range e.Erased {
Expand Down Expand Up @@ -226,6 +236,7 @@ const (
defaultBufferSizeLimitBytes = int64(0) // unlimited
defaultMaskDecisionPath = "/system/log/mask"
logDropCounterName = "decision_logs_dropped"
logNDBDropCounterName = "decision_logs_nd_builtin_cache_dropped"
defaultResourcePath = "/logs"
)

Expand All @@ -248,6 +259,7 @@ type Config struct {
MaskDecision *string `json:"mask_decision"`
ConsoleLogs bool `json:"console"`
Resource *string `json:"resource"`
NDBuiltinCache bool `json:"nd_builtin_cache,omitempty"`
maskDecisionRef ast.Ref
}

Expand Down Expand Up @@ -557,18 +569,19 @@ func (p *Plugin) Log(ctx context.Context, decision *server.Info) error {
}

event := EventV1{
Labels: p.manager.Labels(),
DecisionID: decision.DecisionID,
Revision: decision.Revision,
Bundles: bundles,
Path: decision.Path,
Query: decision.Query,
Input: decision.Input,
Result: decision.Results,
MappedResult: decision.MappedResults,
RequestedBy: decision.RemoteAddr,
Timestamp: decision.Timestamp,
inputAST: decision.InputAST,
Labels: p.manager.Labels(),
DecisionID: decision.DecisionID,
Revision: decision.Revision,
Bundles: bundles,
Path: decision.Path,
Query: decision.Query,
Input: decision.Input,
Result: decision.Results,
MappedResult: decision.MappedResults,
NDBuiltinCache: decision.NDBuiltinCache,
RequestedBy: decision.RemoteAddr,
Timestamp: decision.Timestamp,
inputAST: decision.InputAST,
}

if decision.Metrics != nil {
Expand Down Expand Up @@ -793,6 +806,9 @@ func (p *Plugin) reconfigure(config interface{}) {
p.config = *newConfig
}

// NOTE(philipc): Because ND builtins caching can cause unbounded growth in
// decision log entry size, we do best-effort event encoding here, and when we
// run out of space, we drop the ND builtins cache, and try encoding again.
func (p *Plugin) encodeAndBufferEvent(event EventV1) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is where the retry-encoding-without-NDBCache logic lives. If encoding the event succeeds after dropping the NDBCache, we print an error message + increment a counter metric, so that this behavior can be tracked.

if p.limiter != nil {
if !p.limiter.Allow() {
Expand All @@ -807,11 +823,29 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) {

result, err := p.enc.Write(event)
if err != nil {
// TODO(tsandall): revisit this now that we have an API that
// can return an error. Should the default behaviour be to
// fail-closed as we do for plugins?
p.logger.Error("Log encoding failed: %v.", err)
return
// If there's no ND builtins cache in the event, then we don't
// need to retry encoding anything.
if event.NDBuiltinCache == nil {
// TODO(tsandall): revisit this now that we have an API that
// can return an error. Should the default behaviour be to
// fail-closed as we do for plugins?
p.logger.Error("Log encoding failed: %v.", err)
return
}

// Attempt to encode the event again, dropping the ND builtins cache.
newEvent := event
philipaconrad marked this conversation as resolved.
Show resolved Hide resolved
newEvent.NDBuiltinCache = nil

result, err = p.enc.Write(newEvent)
if err != nil {
p.logger.Error("Log encoding failed: %v.", err)
return
}

// Re-encoding was successful, but we still need to alert users.
p.logger.Error("ND builtins cache dropped from this event to fit under maximum upload size limits. Increase upload size limit or change usage of non-deterministic builtins.")
p.metrics.Counter(logNDBDropCounterName).Incr()
}

for _, chunk := range result {
Expand Down