Skip to content

Commit

Permalink
server+sdk+plugins: Integrate NDBCache into decision logging. (#5147)
Browse files Browse the repository at this point in the history
This commit integrates the non-deterministic builtins caching system
into decision logging, both in the server and sdk packages. Some
reworking of the NDBCache's serialization format were required to
accommodate this. The feature is disabled by default, and must be
opted into by user configuration.

The feature can be enabled via a top-level config key:

    nd_builtin_cache=true

The NDBCache is exposed to the masking system under the
`/nd_builtin_cache` path, which allows masking or dropping sensitive
values from decision logs selectively.

Note: If a decision log event exceeds the `upload_size_limit_bytes`
value for the OPA instance, OPA will reattempt uploading it, after
dropping the NDBCache from the event. This behavior will trigger a log
error, and will increment the `decision_logs_nd_builtin_cache_dropped`
metrics counter.

Fixes: #1514

Signed-off-by: Philip Conrad <philipaconrad@gmail.com>
  • Loading branch information
philipaconrad committed Oct 6, 2022
1 parent 94baa1b commit ac20ef2
Show file tree
Hide file tree
Showing 14 changed files with 472 additions and 109 deletions.
6 changes: 6 additions & 0 deletions config/config.go
Expand Up @@ -32,6 +32,7 @@ type Config struct {
DefaultDecision *string `json:"default_decision,omitempty"`
DefaultAuthorizationDecision *string `json:"default_authorization_decision,omitempty"`
Caching json.RawMessage `json:"caching,omitempty"`
NDBuiltinCache bool `json:"nd_builtin_cache,omitempty"`
PersistenceDirectory *string `json:"persistence_directory,omitempty"`
DistributedTracing json.RawMessage `json:"distributed_tracing,omitempty"`
Storage *struct {
Expand Down Expand Up @@ -87,6 +88,11 @@ func (c Config) DefaultAuthorizationDecisionRef() ast.Ref {
return r
}

// NDBuiltinCacheEnabled returns if the ND builtins cache should be used.
func (c Config) NDBuiltinCacheEnabled() bool {
return c.NDBuiltinCache
}

func (c *Config) validateAndInjectDefaults(id string) error {

if c.DefaultDecision == nil {
Expand Down
1 change: 1 addition & 0 deletions docs/content/configuration.md
Expand Up @@ -740,6 +740,7 @@ For *GHCR* (Github Container Registry) you can use a developer PAT (personal acc
| `default_authorization_decision` | `string` | No (default: `/system/authz/allow`) | Set path of default authorization decision for OPA's API. |
| `persistence_directory` | `string` | No (default `$PWD/.opa`) | Set directory to use for persistence with options like `bundles[_].persist`. |
| `plugins` | `object` | No (default: `{}`) | Location for custom plugin configuration. See [Plugins](../plugins) for details. |
| `nd_builtin_cache` | `boolean` | No (default: `false`) | Enable the non-deterministic builtins caching system during policy evaluation, and include the contents of the cache in decision logs. Note that decision logs that are larger than `upload_size_limit_bytes` will drop the `nd_builtin_cache` key from the log entry before uploading. |

### Keys

Expand Down
9 changes: 8 additions & 1 deletion docs/content/management-decision-logs.md
Expand Up @@ -75,6 +75,7 @@ Decision log updates contain the following fields:
| `[_].metrics` | `object` | Key-value pairs of [performance metrics](../rest-api#performance-metrics). |
| `[_].erased` | `array[string]` | Set of JSON Pointers specifying fields in the event that were erased. |
| `[_].masked` | `array[string]` | Set of JSON Pointers specifying fields in the event that were masked. |
| `[_].nd_builtin_cache` | `object` | Key-value pairs of non-deterministic builtin names, paired with objects specifying the input/output mappings for each unique invocation of that builtin during policy evaluation. Intended for use in debugging and decision replay. Receivers will need to decode the JSON using Rego's JSON decoders. |

If the decision log was successfully uploaded to the remote service, it should respond with an HTTP 2xx status. If the
service responds with a non-2xx status, OPA will requeue the last chunk containing decision log events and upload it
Expand All @@ -95,6 +96,12 @@ the last chunk.

`Equilibrium`: If the chunk size is between 90% and 100% of the user-configured limit, maintain soft limit value.

When an event containing `nd_builtin_cache` cannot fit into a chunk smaller than `upload_size_limit_bytes`, OPA will
drop the `nd_builtin_cache` key from the event, and will retry encoding the chunk without the non-deterministic
builtins cache information. This best-effort approach ensures that OPA reports decision log events as much as possible,
and bounds how large decision log events can get. This size-bounding is necessary, because some non-deterministic builtins
(such as `http.send`) can increase the decision log event size by a potentially unbounded amount.

### Local Decision Logs

Local console logging of decisions can be enabled via the `console` config option.
Expand Down Expand Up @@ -172,7 +179,7 @@ from the decision log event. The erased paths are recorded on the event itself:

There are a few restrictions on the JSON Pointers that OPA will erase:

* Pointers must be prefixed with `/input` or `/result`.
* Pointers must be prefixed with `/input`, `/result`, or `/nd_builtin_cache`.
* Pointers may be undefined. For example `/input/name/first` in the example
above would be undefined. Undefined pointers are ignored.
* Pointers must refer to object keys. Pointers to array elements will be treated
Expand Down
23 changes: 17 additions & 6 deletions plugins/logs/mask.go
Expand Up @@ -21,8 +21,9 @@ const (
maskOPRemove maskOP = "remove"
maskOPUpsert maskOP = "upsert"

partInput = "input"
partResult = "result"
partInput = "input"
partResult = "result"
partNDBCache = "nd_builtin_cache"
)

var (
Expand Down Expand Up @@ -64,7 +65,9 @@ func newMaskRule(path string, opts ...maskRuleOption) (*maskRule, error) {

parts := strings.Split(path[1:], "/")

if parts[0] != partInput && parts[0] != partResult {
switch parts[0] {
case partInput, partResult, partNDBCache: // OK
default:
return nil, fmt.Errorf("mask prefix not allowed: %v", parts[0])
}

Expand Down Expand Up @@ -130,8 +133,8 @@ func withFailUndefinedPath() maskRuleOption {

func (r maskRule) Mask(event *EventV1) error {

var maskObj *interface{} // pointer to event Input|Result object
var maskObjPtr **interface{} // pointer to the event Input|Result pointer itself
var maskObj *interface{} // pointer to event Input|Result|NDBCache object
var maskObjPtr **interface{} // pointer to the event Input|Result|NDBCache pointer itself

switch p := r.escapedParts[0]; p {
case partInput:
Expand All @@ -143,7 +146,6 @@ func (r maskRule) Mask(event *EventV1) error {
}
maskObj = event.Input
maskObjPtr = &event.Input

case partResult:
if event.Result == nil {
if r.failUndefinedPath {
Expand All @@ -153,6 +155,15 @@ func (r maskRule) Mask(event *EventV1) error {
}
maskObj = event.Result
maskObjPtr = &event.Result
case partNDBCache:
if event.NDBuiltinCache == nil {
if r.failUndefinedPath {
return errMaskInvalidObject
}
return nil
}
maskObj = event.NDBuiltinCache
maskObjPtr = &event.NDBuiltinCache
default:
return fmt.Errorf("illegal path value: %s", p)
}
Expand Down
98 changes: 66 additions & 32 deletions plugins/logs/plugin.go
Expand Up @@ -44,21 +44,22 @@ type Logger interface {
// the struct. Any changes here MUST be reflected in the AST()
// implementation below.
type EventV1 struct {
Labels map[string]string `json:"labels"`
DecisionID string `json:"decision_id"`
Revision string `json:"revision,omitempty"` // Deprecated: Use Bundles instead
Bundles map[string]BundleInfoV1 `json:"bundles,omitempty"`
Path string `json:"path,omitempty"`
Query string `json:"query,omitempty"`
Input *interface{} `json:"input,omitempty"`
Result *interface{} `json:"result,omitempty"`
MappedResult *interface{} `json:"mapped_result,omitempty"`
Erased []string `json:"erased,omitempty"`
Masked []string `json:"masked,omitempty"`
Error error `json:"error,omitempty"`
RequestedBy string `json:"requested_by,omitempty"`
Timestamp time.Time `json:"timestamp"`
Metrics map[string]interface{} `json:"metrics,omitempty"`
Labels map[string]string `json:"labels"`
DecisionID string `json:"decision_id"`
Revision string `json:"revision,omitempty"` // Deprecated: Use Bundles instead
Bundles map[string]BundleInfoV1 `json:"bundles,omitempty"`
Path string `json:"path,omitempty"`
Query string `json:"query,omitempty"`
Input *interface{} `json:"input,omitempty"`
Result *interface{} `json:"result,omitempty"`
MappedResult *interface{} `json:"mapped_result,omitempty"`
NDBuiltinCache *interface{} `json:"nd_builtin_cache,omitempty"`
Erased []string `json:"erased,omitempty"`
Masked []string `json:"masked,omitempty"`
Error error `json:"error,omitempty"`
RequestedBy string `json:"requested_by,omitempty"`
Timestamp time.Time `json:"timestamp"`
Metrics map[string]interface{} `json:"metrics,omitempty"`

inputAST ast.Value
}
Expand Down Expand Up @@ -87,6 +88,7 @@ var queryKey = ast.StringTerm("query")
var inputKey = ast.StringTerm("input")
var resultKey = ast.StringTerm("result")
var mappedResultKey = ast.StringTerm("mapped_result")
var ndBuiltinCacheKey = ast.StringTerm("nd_builtin_cache")
var erasedKey = ast.StringTerm("erased")
var maskedKey = ast.StringTerm("masked")
var errorKey = ast.StringTerm("error")
Expand Down Expand Up @@ -159,6 +161,14 @@ func (e *EventV1) AST() (ast.Value, error) {
event.Insert(mappedResultKey, ast.NewTerm(mResults))
}

if e.NDBuiltinCache != nil {
ndbCache, err := roundtripJSONToAST(e.NDBuiltinCache)
if err != nil {
return nil, err
}
event.Insert(ndBuiltinCacheKey, ast.NewTerm(ndbCache))
}

if len(e.Erased) > 0 {
erased := make([]*ast.Term, len(e.Erased))
for i, v := range e.Erased {
Expand Down Expand Up @@ -226,6 +236,7 @@ const (
defaultBufferSizeLimitBytes = int64(0) // unlimited
defaultMaskDecisionPath = "/system/log/mask"
logDropCounterName = "decision_logs_dropped"
logNDBDropCounterName = "decision_logs_nd_builtin_cache_dropped"
defaultResourcePath = "/logs"
)

Expand All @@ -248,6 +259,7 @@ type Config struct {
MaskDecision *string `json:"mask_decision"`
ConsoleLogs bool `json:"console"`
Resource *string `json:"resource"`
NDBuiltinCache bool `json:"nd_builtin_cache,omitempty"`
maskDecisionRef ast.Ref
}

Expand Down Expand Up @@ -557,18 +569,19 @@ func (p *Plugin) Log(ctx context.Context, decision *server.Info) error {
}

event := EventV1{
Labels: p.manager.Labels(),
DecisionID: decision.DecisionID,
Revision: decision.Revision,
Bundles: bundles,
Path: decision.Path,
Query: decision.Query,
Input: decision.Input,
Result: decision.Results,
MappedResult: decision.MappedResults,
RequestedBy: decision.RemoteAddr,
Timestamp: decision.Timestamp,
inputAST: decision.InputAST,
Labels: p.manager.Labels(),
DecisionID: decision.DecisionID,
Revision: decision.Revision,
Bundles: bundles,
Path: decision.Path,
Query: decision.Query,
Input: decision.Input,
Result: decision.Results,
MappedResult: decision.MappedResults,
NDBuiltinCache: decision.NDBuiltinCache,
RequestedBy: decision.RemoteAddr,
Timestamp: decision.Timestamp,
inputAST: decision.InputAST,
}

if decision.Metrics != nil {
Expand Down Expand Up @@ -793,6 +806,9 @@ func (p *Plugin) reconfigure(config interface{}) {
p.config = *newConfig
}

// NOTE(philipc): Because ND builtins caching can cause unbounded growth in
// decision log entry size, we do best-effort event encoding here, and when we
// run out of space, we drop the ND builtins cache, and try encoding again.
func (p *Plugin) encodeAndBufferEvent(event EventV1) {
if p.limiter != nil {
if !p.limiter.Allow() {
Expand All @@ -807,11 +823,29 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) {

result, err := p.enc.Write(event)
if err != nil {
// TODO(tsandall): revisit this now that we have an API that
// can return an error. Should the default behaviour be to
// fail-closed as we do for plugins?
p.logger.Error("Log encoding failed: %v.", err)
return
// If there's no ND builtins cache in the event, then we don't
// need to retry encoding anything.
if event.NDBuiltinCache == nil {
// TODO(tsandall): revisit this now that we have an API that
// can return an error. Should the default behaviour be to
// fail-closed as we do for plugins?
p.logger.Error("Log encoding failed: %v.", err)
return
}

// Attempt to encode the event again, dropping the ND builtins cache.
newEvent := event
newEvent.NDBuiltinCache = nil

result, err = p.enc.Write(newEvent)
if err != nil {
p.logger.Error("Log encoding failed: %v.", err)
return
}

// Re-encoding was successful, but we still need to alert users.
p.logger.Error("ND builtins cache dropped from this event to fit under maximum upload size limits. Increase upload size limit or change usage of non-deterministic builtins.")
p.metrics.Counter(logNDBDropCounterName).Incr()
}

for _, chunk := range result {
Expand Down

0 comments on commit ac20ef2

Please sign in to comment.